From db9b70feb09252dac5d0b49b2bb33f1a1f932853 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Tue, 16 Jun 2020 02:36:16 +0200 Subject: [PATCH] new node and workflow to index on ES --- .../ariadneplus/elasticsearch/BulkUpload.java | 8 + .../model/ArchaeologicalResourceType.java | 52 ++++++ .../model/ArcheologicalResourceType.java | 52 ------ .../model/AriadneCatalogEntry.java | 10 +- .../ariadneplus/graphdb/GraphDBClient.java | 18 +- .../ariadneplus/reader/ResourceManager.java | 6 +- .../reader/RunSPARQLQueryService.java | 13 +- .../ariadneplus/reader/json/ParseRDFJSON.java | 23 +++ .../src/main/resources/application.properties | 9 +- .../src/main/resources/query.txt | 164 ------------------ .../GraphDbReaderAndESIndexTest.java | 14 +- .../workflows/nodes/IndexOnESJobNode.java | 155 +++++++++++++++++ ...licationContext-ariadneplus-msro-nodes.xml | 2 + .../workflows/repo-hi/index_on_es_wf.xml.st | 54 ++++++ .../workflows/index_on_es_template.xml | 32 ++++ .../workflows/repo_hi_index_on_es.xml | 60 +++++++ 16 files changed, 424 insertions(+), 248 deletions(-) create mode 100644 dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/model/ArchaeologicalResourceType.java delete mode 100644 dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/model/ArcheologicalResourceType.java delete mode 100644 dnet-ariadneplus-graphdb-publisher/src/main/resources/query.txt create mode 100644 dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/IndexOnESJobNode.java create mode 100644 dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/repo-hi/index_on_es_wf.xml.st create mode 100644 dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/index_on_es_template.xml create mode 100644 dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_index_on_es.xml diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/BulkUpload.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/BulkUpload.java index 1efe85d..294e29c 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/BulkUpload.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/BulkUpload.java @@ -33,6 +33,14 @@ public class BulkUpload { private RestHighLevelClient client; + public void init(String elasticSearchHostName, String elasticSearchIndexName) throws IOException { + this.elasticSearchIndexName = elasticSearchIndexName; + client = new RestHighLevelClient( + RestClient.builder( + new HttpHost(elasticSearchHostName,9200,"http"))); + + } + @PostConstruct public void init() throws IOException { client = new RestHighLevelClient( diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/model/ArchaeologicalResourceType.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/model/ArchaeologicalResourceType.java new file mode 100644 index 0000000..ae7cce6 --- /dev/null +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/model/ArchaeologicalResourceType.java @@ -0,0 +1,52 @@ +package eu.dnetlib.ariadneplus.elasticsearch.model; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; + +import java.util.Map; + +public class ArchaeologicalResourceType { + private long id = 100000; + private String name; + + public ArchaeologicalResourceType() { + } + +// public static ArchaeologicalResourceType fromRDFJson(JsonElement json) { +// ArchaeologicalResourceType art = new ArchaeologicalResourceType(); +// for (Map.Entry entry : json.getAsJsonObject().entrySet()){ +// switch (entry.getKey()){ +// case "https://www.ariadne-infrastructure.eu/property/id" : +// art.setId(entry.getValue().getAsJsonArray().get(0).getAsJsonObject().get("value").getAsString()); +// break; +// case "https://www.ariadne-infrastructure.eu/property/name": +// String tmp = entry.getValue().getAsJsonArray().get(0).getAsJsonObject().get("value").getAsString(); +// art.setName(tmp); +// break; +// +// } +// } +// +// return art; +// } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public static ArchaeologicalResourceType fromJson(String json){ + return new Gson().fromJson(json, ArchaeologicalResourceType.class); + } +} diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/model/ArcheologicalResourceType.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/model/ArcheologicalResourceType.java deleted file mode 100644 index 62fa44d..0000000 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/model/ArcheologicalResourceType.java +++ /dev/null @@ -1,52 +0,0 @@ -package eu.dnetlib.ariadneplus.elasticsearch.model; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; - -import java.util.Map; - -public class ArcheologicalResourceType { - private String id; - private String name; - - public ArcheologicalResourceType() { - } - - public static ArcheologicalResourceType fromRDFJson(JsonElement json) { - ArcheologicalResourceType art = new ArcheologicalResourceType(); - for (Map.Entry entry : json.getAsJsonObject().entrySet()){ - switch (entry.getKey()){ - case "https://www.ariadne-infrastructure.eu/property/id" : - art.setId(entry.getValue().getAsJsonArray().get(0).getAsJsonObject().get("value").getAsString()); - break; - case "https://www.ariadne-infrastructure.eu/property/name": - String tmp = entry.getValue().getAsJsonArray().get(0).getAsJsonObject().get("value").getAsString(); - art.setName(tmp); - break; - - } - } - - return art; - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public static ArcheologicalResourceType fromJson(String json){ - return new Gson().fromJson(json, ArcheologicalResourceType.class); - } -} diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/model/AriadneCatalogEntry.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/model/AriadneCatalogEntry.java index 15dd15b..5467e33 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/model/AriadneCatalogEntry.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/elasticsearch/model/AriadneCatalogEntry.java @@ -9,7 +9,7 @@ public class AriadneCatalogEntry { private List aatSubjects; private String accessPolicy; private String accessRights; - private ArcheologicalResourceType archeologicalResourceType; + private ArchaeologicalResourceType archaeologicalResourceType; private String contactPoint; private List contributor; private List creator; @@ -80,12 +80,12 @@ public class AriadneCatalogEntry { this.accessRights = accessRights; } - public ArcheologicalResourceType getArcheologicalResourceType() { - return archeologicalResourceType; + public ArchaeologicalResourceType getArchaeologicalResourceType() { + return archaeologicalResourceType; } - public void setArcheologicalResourceType(ArcheologicalResourceType archeologicalResourceType) { - this.archeologicalResourceType = archeologicalResourceType; + public void setArchaeologicalResourceType(ArchaeologicalResourceType archaeologicalResourceType) { + this.archaeologicalResourceType = archaeologicalResourceType; } public String getContactPoint() { diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java index c1b3f4b..d685986 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java @@ -354,16 +354,18 @@ public class GraphDBClient { runSPQRLQuery.setParser(parseRDFJSON); runSPQRLQuery.setResourceManager(resourceManager); runSPQRLQuery.setBulkUpload(bulkUpload); - String recordId = ""; - List recordIds = Arrays.asList(recordId); -// List recordIds = runSPQRLQuery.selectRecordIds(datasource, collectionId); +// String recordId = "https://ariadne-infrastructure.eu/aocat/Resource/02E4F4B5-24B7-3AD7-B460-CFA8B1F0BD1F"; +// List recordIds = Arrays.asList(recordId); + List recordIds = runSPQRLQuery.selectRecordIds(datasource, collectionId); final ClassPathResource queryTemplateResource = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_record_data_template.sparql"); String queryTemplate = IOUtils.toString(queryTemplateResource.getInputStream(), StandardCharsets.UTF_8.name()); - runSPQRLQuery.executeMultipleQueryGraph(queryTemplate, recordIds, datasource, collectionId); -// List collectionResourceId = runSPQRLQuery.selectCollectionId(datasource, collectionId); -// final ClassPathResource selectCollectionTemplateRes = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_collection_data_template.sparql"); -// String selectCollectionTemplate = IOUtils.toString(selectCollectionTemplateRes.getInputStream(), StandardCharsets.UTF_8.name()); -// runSPQRLQuery.executeMultipleQueryGraph(selectCollectionTemplate, collectionResourceId, datasource, collectionId); + boolean isCollection = false; + runSPQRLQuery.executeMultipleQueryGraph(queryTemplate, recordIds, datasource, collectionId, isCollection); + List collectionResourceId = runSPQRLQuery.selectCollectionId(datasource, collectionId); + final ClassPathResource selectCollectionTemplateRes = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_collection_data_template.sparql"); + String selectCollectionTemplate = IOUtils.toString(selectCollectionTemplateRes.getInputStream(), StandardCharsets.UTF_8.name()); + isCollection = true; + runSPQRLQuery.executeMultipleQueryGraph(selectCollectionTemplate, collectionResourceId, datasource, collectionId, isCollection); }catch(Throwable e){ log.error(e); throw new AriadnePlusPublisherException(e); diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/ResourceManager.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/ResourceManager.java index cb87c7b..73b2196 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/ResourceManager.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/ResourceManager.java @@ -49,8 +49,8 @@ public class ResourceManager { this.type_path = type_path; this.general_classpath = general_classpath; this.exclude_predicates = exclude_predicates; - propertiesMap = new PropertiesMap(); - propertiesMap.fill(spec); + this.spec = spec; + init(); } @PostConstruct @@ -87,7 +87,7 @@ public class ResourceManager { if(entry instanceof LinkedHashMap){ LinkedHashMap tmp = (LinkedHashMap)((JSONArray)((LinkedHashMap)entry).get(type_path)).get(0); class_name = (String)tmp.get("value"); - if (class_name.equals("provided record")) { + if (class_name.equals("provided record") || class_name.equals("Dataset Collection")) { class_name = "AriadneCatalogEntry"; } } diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/RunSPARQLQueryService.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/RunSPARQLQueryService.java index 8fac2f4..fd79685 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/RunSPARQLQueryService.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/RunSPARQLQueryService.java @@ -59,19 +59,19 @@ public class RunSPARQLQueryService { manager.shutDown(); } - public String executeMultipleQueryGraph(String queryTemplate, List recordIds, String datasource, String collectionId){ + public String executeMultipleQueryGraph(String queryTemplate, List recordIds, String datasource, String collectionId, boolean isCollection){ if (queryTemplate==null) return null; final String selectQueryTemplate = queryTemplate.replaceAll("%datasource", datasource).replaceAll("%collectionId", collectionId); recordIds.forEach(recordId -> { - executeQueryGraph(selectQueryTemplate, recordId); + executeQueryGraph(selectQueryTemplate, recordId, isCollection); }); return "ok"; } - private String executeQueryGraph(String selectQueryTemplate, String recordId){ - log.debug("Retrieving "+recordId+" ..."); - String query = selectQueryTemplate.replaceAll("%record", recordId); + private String executeQueryGraph(String selectQueryTemplate, String recordId, boolean isCollection){ + log.debug("Retrieving "+recordId+" - isCollection:"+isCollection ); + String query = selectQueryTemplate.replaceAll("%record", "<"+recordId+">"); openConnection(); StringWriter recordWriter = null; Model resultsModel = null; @@ -90,6 +90,9 @@ public class RunSPARQLQueryService { recordWriter = new StringWriter(); RDFWriter rdfRecordWriter = Rio.createWriter(RDFFormat.RDFJSON, recordWriter); Rio.write(resultsModel, rdfRecordWriter); + if (isCollection) { + parser.setCollection(true); + } parser.parse(recordWriter.toString()); resourceManager.manage(parser); bulkUpload.index(resourceManager); diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/json/ParseRDFJSON.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/json/ParseRDFJSON.java index 1ffd590..4a2549f 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/json/ParseRDFJSON.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/json/ParseRDFJSON.java @@ -3,6 +3,9 @@ package eu.dnetlib.ariadneplus.reader.json; import java.util.Iterator; import java.util.LinkedHashMap; +import eu.dnetlib.ariadneplus.reader.RunSPARQLQueryService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @@ -18,11 +21,18 @@ import net.minidev.json.parser.ParseException; @Service public class ParseRDFJSON { + private static final Log log = LogFactory.getLog(ParseRDFJSON.class); + static JSONObject map ; @Value("${catalog.entry.path}") private String catalogEntryJsonPath; + @Value("${catalog.entry.collection.path}") + private String catalogEntryCollectionJsonPath; + + private boolean isCollection = false; + private String json; private Iterator it ; @@ -45,8 +55,10 @@ public class ParseRDFJSON { setJson(json); fillMap(); DocumentContext jsonContext = JsonPath.parse(json); + log.debug(getCatalogEntryJsonPath()); JSONArray entries = jsonContext.read(getCatalogEntryJsonPath()); int size = entries.size(); + log.debug("num elements in json: "+size); it = entries.iterator(); } @@ -63,10 +75,21 @@ public class ParseRDFJSON { } public String getCatalogEntryJsonPath() { + if (isCollection) { + return catalogEntryCollectionJsonPath; + } return catalogEntryJsonPath; } public void setCatalogEntryJsonPath(String catalogEntryJsonPath) { this.catalogEntryJsonPath = catalogEntryJsonPath; } + + public boolean isCollection() { + return isCollection; + } + + public void setCollection(boolean collection) { + isCollection = collection; + } } diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/resources/application.properties b/dnet-ariadneplus-graphdb-publisher/src/main/resources/application.properties index f2ca6cb..09399de 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/resources/application.properties +++ b/dnet-ariadneplus-graphdb-publisher/src/main/resources/application.properties @@ -5,7 +5,7 @@ server.port=8281 graphdb.serverUrl=http://graphdb-test.ariadne.d4science.org:7200 graphdb.writer.user=writer -graphdb.writer.pwd=******** +graphdb.writer.pwd=***** graphdb.repository=ariadneplus-ts01 graphdb.baseURI=https://ariadne-infrastructure.eu/ @@ -13,6 +13,7 @@ elasticsearch.hostname=elastic-test.ariadne.d4science.org elasticsearch.indexname=catalog_test catalog.entry.path=$[*][?(@['https://www.ariadne-infrastructure.eu/property/resourceType'][0]['value']=='provided record')] +catalog.entry.collection.path=$[*][?(@['https://www.ariadne-infrastructure.eu/property/resourceType'][0]['value']=='Dataset Collection')] general.classpath=eu.dnetlib.ariadneplus.elasticsearch.model. type.path=https://www.ariadne-infrastructure.eu/property/resourceType exclude.predicates=["https://www.ariadne-infrastructure.eu/property/resourceType", "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"] @@ -95,7 +96,7 @@ class.map.specifications={\ "element_type": "java.lang.String"\ }}\ },\ -"ArcheologicalResourceType": {\ +"ArchaeologicalResourceType": {\ "class_type": "unique",\ "mappings": {\ "https://www.ariadne-infrastructure.eu/property/id": {\ @@ -204,8 +205,8 @@ class.map.specifications={\ "substring": "no"\ },\ "https://www.ariadne-infrastructure.eu/property/archeologicalResourceType": {\ -"class_field": "ArcheologicalResourceType",\ -"external_reference": "ArcheologicalResourceType",\ +"class_field": "ArchaeologicalResourceType",\ +"external_reference": "ArchaeologicalResourceType",\ "substring": "no"\ },\ "https://www.ariadne-infrastructure.eu/property/issued": {\ diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/resources/query.txt b/dnet-ariadneplus-graphdb-publisher/src/main/resources/query.txt deleted file mode 100644 index 0e669bd..0000000 --- a/dnet-ariadneplus-graphdb-publisher/src/main/resources/query.txt +++ /dev/null @@ -1,164 +0,0 @@ -PREFIX aocat: -PREFIX rdfs: -PREFIX skos: -PREFIX aoprop: -PREFIX onto: -PREFIX ariadneplus: -PREFIX time: - -CONSTRUCT { -?record aoprop:identifier ?record . -?record aoprop:originalId ?originalId . -?record aoprop:issued ?issued . -?record aoprop:modified ?modified . -?record aoprop:partOf ?partOf . -?record aoprop:creator ?creator . -?creator aoprop:name ?creatorName . -?creator aoprop:email ?creatorEmail . -?record aoprop:contributor ?contributor . -?contributor aoprop:name ?contributorName . -?contributor aoprop:email ?contributorEmail . -?record aoprop:legalResponsible ?legalResponsible . -?legalResponsible aoprop:name ?legalResponsibleName . -?legalResponsible aoprop:email ?legalResponsibleEmail . -?record aoprop:owner ?owner . -?owner aoprop:name ?ownerName . -?owner aoprop:email ?ownerEmail . -?record aoprop:publisher ?publisher . -?publisher aoprop:name ?publisherName . -?publisher aoprop:email ?publisherEmail . -?record aoprop:accessPolicy ?accessPolicy . -?record aoprop:accessRights ?accessRights . -?record aoprop:landingPage ?landingPage . -?record aoprop:spatialRegion ?spatialRegion . -?spatialRegion aoprop:placeName ?spatialPlaceName . -?spatialRegion aoprop:spatialCoordinateSystem ?spatialCoordinateSystem . -?record aoprop:spatialRegionPoint ?spatialRegionPoint . -?spatialRegionPoint aoprop:lat ?spatialLocationLat . -?spatialRegionPoint aoprop:lon ?spatialLocationLon . -?record aoprop:spatialRegionBox ?spatialRegionBox . -?spatialRegionBox aoprop:boxMaxLat ?spatialLocationBBMaxLat . -?spatialRegionBox aoprop:boxMaxLon ?spatialLocationBBMaxLon . -?spatialRegionBox aoprop:boxMinLat ?spatialLocationBBMinLat . -?spatialRegionBox aoprop:boxMinLon ?spatialLocationBBMinLon . -?record aoprop:uri ?temporal . -?temporal aoprop:periodName ?temporalPeriodName . -?temporal aoprop:from ?temporalFrom . -?temporal aoprop:until ?temporalUntil . -?record aoprop:uri ?temporalNative . -?temporalNative aoprop:periodName ?temporalNativePeriodName . -?record aoprop:archeologicalResourceType ?archeologicalResourceType . -?archeologicalResourceType aoprop:name ?archeologicalResourceTypeName . -?record aoprop:resourceType ?resourceType . -?record aoprop:nativeSubject ?nativeSubject . -?nativeSubject aoprop:prefLabel ?nativeSubjectPrefLabel . -?nativeSubject aoprop:rdfAbout ?nativeSubject . -?record aoprop:derivedSubject ?derivedSubject . -?derivedSubject aoprop:prefLabel ?derivedSubjectPrefLabel . -?derivedSubject aoprop:source "Getty AAT" . -?record aoprop:aatSubjects ?derivedSubject . -?derivedSubject aoprop:id ?derivedSubject . -?derivedSubject aoprop:label ?derivedSubjectPrefLabel . -?record aoprop:title ?title . -?record aoprop:description ?description . -?record aoprop:language ?language . -} -from -from -from -from -from -where { - ?record aocat:has_language / skos:prefLabel ?language . - ?record aocat:has_original_id ?originalId . - ?record aocat:is_part_of ?partOf . - ?record aocat:has_creator ?creator . - ?creator aocat:has_name ?creatorName . - ?record aocat:has_title ?title . - ?record aocat:has_type / skos:prefLabel ?resourceType . - ?record aocat:has_native_subject ?nativeSubject . - ?nativeSubject skos:prefLabel ?nativeSubjectPrefLabel . - optional { - ?record aocat:has_derived_subject ?derivedSubject . - ?derivedSubject skos:prefLabel ?derivedSubjectPrefLabel . - } - optional { - ?creator aocat:has_email ?creatorEmail . - } - optional { - ?record aocat:has_description ?description . - } - optional { - ?record aocat:has_access_policy / rdfs:label ?accessPolicy . - } - optional { - ?record aocat:has_landing_page / rdfs:label ?landingPage . - } - optional { - ?record aocat:has_temporal_coverage ?temporalNative . - ?temporalNative aocat:has_native_period / skos:prefLabel ?temporalNativePeriodName . - } - optional { - ?record aocat:has_temporal_coverage ?temporal . - ?temporal aocat:has_period / skos:prefLabel ?temporalPeriodName . - optional { - ?temporal aocat:from ?temporalFrom . - ?temporal aocat:until ?temporalUntil . - } - } - - { - select * - where { - ?record aocat:is_part_of ?collection . - ?collection aocat:was_issued ?issued . - ?collection aocat:was_modified ?modified . - ?collection aocat:has_contributor ?contributor . - ?contributor aocat:has_name ?contributorName . - ?collection aocat:has_responsible ?legalResponsible . - ?legalResponsible aocat:has_name ?legalResponsibleName . - ?collection aocat:has_owner ?owner . - ?owner aocat:has_name ?ownerName . - ?collection aocat:has_publisher ?publisher . - ?publisher aocat:has_name ?publisherName . - ?collection aocat:has_access_rights ?accessRights . - ?collection aocat:has_ARIADNE_subject ?archeologicalResourceType . - ?archeologicalResourceType skos:prefLabel ?archeologicalResourceTypeName . - optional { - ?contributor aocat:has_email ?contributorEmail . - } - optional { - ?legalResponsible aocat:has_email ?legalResponsibleEmail . - } - optional { - ?owner aocat:has_email ?ownerEmail . - } - optional { - ?publisher aocat:has_email ?publisherEmail . - } - } - } - { - select * - where { - ?record aocat:has_spatial_coverage ?spatialRegion . - ?spatialRegion aocat:has_place_name ?spatialPlaceName . - optional { - ?spatialRegion aocat:has_coordinate_system ?spatialCoordinateSystem . - } - optional { - ?record aocat:has_spatial_coverage ?spatialRegionPoint . - ?spatialRegionPoint aocat:has_latitude ?spatialLocationLat ; - aocat:has_longitude ?spatialLocationLon . - } - optional { - ?record aocat:has_spatial_coverage ?spatialRegionBox . - ?spatialRegionBox aocat:has_bounding_box_max_lat ?spatialLocationBBMaxLat ; - aocat:has_bounding_box_max_lon ?spatialLocationBBMaxLon ; - aocat:has_bounding_box_min_lat ?spatialLocationBBMinLat ; - aocat:has_bounding_box_min_lon ?spatialLocationBBMinLon ; - } - } - } - -} \ No newline at end of file diff --git a/dnet-ariadneplus-graphdb-publisher/test/java/eu/dnetlib/ariadneplus/GraphDbReaderAndESIndexTest.java b/dnet-ariadneplus-graphdb-publisher/test/java/eu/dnetlib/ariadneplus/GraphDbReaderAndESIndexTest.java index c979607..82b6154 100644 --- a/dnet-ariadneplus-graphdb-publisher/test/java/eu/dnetlib/ariadneplus/GraphDbReaderAndESIndexTest.java +++ b/dnet-ariadneplus-graphdb-publisher/test/java/eu/dnetlib/ariadneplus/GraphDbReaderAndESIndexTest.java @@ -26,7 +26,7 @@ public class GraphDbReaderAndESIndexTest { private RunSPARQLQueryService runSPQRLQuery; @Test - @Ignore +// @Ignore public void readAndIndexTest() throws Exception { final ClassPathResource resource = new ClassPathResource("application.properties"); Properties appProps = new Properties(); @@ -36,7 +36,7 @@ public class GraphDbReaderAndESIndexTest { runSPQRLQuery.setupConnection( appProps.getProperty("graphdb.writer.user"), appProps.getProperty("graphdb.writer.pwd"), - appProps.getProperty("repository.url"), + appProps.getProperty("graphdb.serverUrl"), appProps.getProperty("graphdb.repository")); ParseRDFJSON parseRDFJSON = new ParseRDFJSON(); parseRDFJSON.setCatalogEntryJsonPath(appProps.getProperty("catalog.entry.path")); @@ -48,12 +48,11 @@ public class GraphDbReaderAndESIndexTest { appProps.getProperty("exclude.predicates"), appProps.getProperty("class.map.specifications") ); - resourceManager.init(); runSPQRLQuery.setResourceManager(resourceManager); BulkUpload bulkUpload = new BulkUpload(); - bulkUpload.init(); + bulkUpload.init(appProps.getProperty("elasticsearch.hostname"),appProps.getProperty("elasticsearch.indexname")); runSPQRLQuery.setBulkUpload(bulkUpload); - String recordId = ""; + String recordId = ""; String datasource = "ads"; String collectionId = "271"; List recordIds = Arrays.asList(recordId); @@ -69,14 +68,15 @@ public class GraphDbReaderAndESIndexTest { final ClassPathResource resource = new ClassPathResource("application.properties"); Properties appProps = new Properties(); appProps.load(resource.getInputStream()); - + String datasource = "ads"; + String collectionId = "271"; runSPQRLQuery = new RunSPARQLQueryService(); runSPQRLQuery.setupConnection( appProps.getProperty("graphdb.writer.user"), appProps.getProperty("graphdb.writer.pwd"), appProps.getProperty("repository.url"), appProps.getProperty("graphdb.repository")); - runSPQRLQuery.selectRecordIds(); + runSPQRLQuery.selectRecordIds(datasource, collectionId); } @Test diff --git a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/IndexOnESJobNode.java b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/IndexOnESJobNode.java new file mode 100644 index 0000000..7704254 --- /dev/null +++ b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/IndexOnESJobNode.java @@ -0,0 +1,155 @@ +package eu.dnetlib.ariadneplus.workflows.nodes; + +import com.google.common.collect.Lists; +import eu.dnetlib.enabling.resultset.client.ResultSetClient; +import eu.dnetlib.msro.workflows.graph.Arc; +import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; +import eu.dnetlib.msro.workflows.procs.Env; +import eu.dnetlib.msro.workflows.procs.Token; +import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider; +import eu.dnetlib.msro.workflows.util.WorkflowsConstants; +import eu.dnetlib.rmi.common.ResultSet; +import eu.dnetlib.rmi.manager.MSROException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.NameValuePair; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicNameValuePair; +import org.springframework.beans.factory.annotation.Autowired; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.List; + + +public class IndexOnESJobNode extends AsyncJobNode { + + private static final Log log = LogFactory.getLog(IndexOnESJobNode.class); + + private String eprParam; + + @Autowired + private ResultSetClient resultSetClient; + + private String publisherEndpoint; + private String datasourceInterface; + private String datasource; + + //for parallel requests to the publisher endpoint + private int nThreads = 5; + + @Override + protected String execute(final Env env) throws Exception { + + int statusCode = -1; + String indexOnESResult = "noResult"; + log.info("Publisher endpoint: " + getPublisherEndpoint()); + PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); + cm.setMaxTotal(nThreads); + CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build(); + + log.info("IndexOnES endpoint: " + getIndexOnESEndpoint()); + CloseableHttpResponse responsePOST = null; + try { + HttpPost post = new HttpPost(getIndexOnESEndpoint()); + List params = Lists.newArrayList(); + String[] splits = getDatasourceInterface().split("::"); + String datasource = splits[2]; + String collectionId = splits[3]; + params.add(new BasicNameValuePair("datasource", datasource)); + params.add(new BasicNameValuePair("collectionId", collectionId)); + UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8"); + post.setEntity(ent); + log.info("Calling IndexOnES endpoint with params: "+getDatasource()+" "+getDatasourceInterface()); + responsePOST = client.execute(post); + statusCode = responsePOST.getStatusLine().getStatusCode(); + switch (statusCode) { + case 200: + log.info("index on ES completed"); + break; + default: + log.error("error indexing on ES " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase()); + break; + } + } catch (ConnectException ce) { + log.error(ce); + throw new MSROException("Unable to connect to Publisher endpoint" + getIndexOnESEndpoint()); + } + catch (IOException e) { + log.error(e); + throw new MSROException("IO Error" + getIndexOnESEndpoint()); + } + finally{ + if(responsePOST != null) responsePOST.close(); + client.close(); + cm.shutdown(); + } + + env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode)); + env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "enrichResult", indexOnESResult); + + log.info(indexOnESResult); + if (statusCode!=200) { + throw new MSROException("Error from Publisher endpoint [ status code: " + statusCode + " ]"); + } + + return Arc.DEFAULT_ARC; + } + + public String getPublisherEndpoint() { + return publisherEndpoint; + } + + private String getIndexOnESEndpoint() { + return publisherEndpoint.concat("/indexOnES"); + } + + public void setPublisherEndpoint(final String publisherEndpoint) { + this.publisherEndpoint = publisherEndpoint; + } + + public ResultSetClient getResultSetClient() { + return resultSetClient; + } + + public void setResultSetClient(final ResultSetClient resultSetClient) { + this.resultSetClient = resultSetClient; + } + + public String getEprParam() { + return eprParam; + } + + public void setEprParam(String eprParam) { + this.eprParam = eprParam; + } + + public String getDatasourceInterface() { + return datasourceInterface; + } + + + public void setDatasourceInterface(String datasourceInterface) { + this.datasourceInterface = datasourceInterface; + } + + @Override + protected void beforeStart(Token token) { + token.setProgressProvider(new ResultsetProgressProvider(token.getEnv().getAttribute(getEprParam(), ResultSet.class), this.resultSetClient)); + } + + public String getDatasource() { + return datasource; + } + + public void setDatasource(String datasource) { + this.datasource = datasource; + } + +} diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/nodes/applicationContext-ariadneplus-msro-nodes.xml b/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/nodes/applicationContext-ariadneplus-msro-nodes.xml index cca0863..a3bb235 100644 --- a/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/nodes/applicationContext-ariadneplus-msro-nodes.xml +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/nodes/applicationContext-ariadneplus-msro-nodes.xml @@ -17,4 +17,6 @@ + + diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/repo-hi/index_on_es_wf.xml.st b/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/repo-hi/index_on_es_wf.xml.st new file mode 100644 index 0000000..4ced6d1 --- /dev/null +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/repo-hi/index_on_es_wf.xml.st @@ -0,0 +1,54 @@ + + +
+ + + + + +
+ + $name$ + $desc$ + + aggregator + $priority$ + + + + + http://localhost:8080/ariadneplus/publish + + + + Index on Elastic Search all records of a collection from GraphDB + + + + + + + + + + + + + + + + + + + + + + + + + 9 9 9 ? * * + 10080 + + + +
diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/index_on_es_template.xml b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/index_on_es_template.xml new file mode 100644 index 0000000..ef24ed1 --- /dev/null +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/index_on_es_template.xml @@ -0,0 +1,32 @@ + +
+ + + + + +
+ + + + + + + http://localhost:8281/ariadneplus-graphdb + + + + Index on Elastic Search all records of a collection from GraphDB + + + + + + + + + + + + +
\ No newline at end of file diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_index_on_es.xml b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_index_on_es.xml new file mode 100644 index 0000000..668db14 --- /dev/null +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_index_on_es.xml @@ -0,0 +1,60 @@ + + +
+ + + + + +
+ + Index On ES + Index on Elastic Search all records of a collection from GraphDB + + IndexOnES + Content Provider + + REPO_HI + 20 + + + + + Verify if DS is pending + + + + + + + + + + + Validate DS + + + + + + + Create Workflow + + + + + + + + + + + + + + 9 9 9 ? * * + 10080 + + + +