From db2ad3f97ef67fa78be1c6cb46e1c982c968df29 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Wed, 12 Aug 2020 15:47:55 +0200 Subject: [PATCH] multithreads http client not needed on indexjobnode, maybe avoid 404 response after a few minutes, added log on each indexing operation --- .../reader/RunSPARQLQueryService.java | 14 ++ .../ADS/ordered_sparql_insert_398.sparql | 124 ++++++++++++++++++ .../GraphDbReaderAndESIndexTest.java | 6 +- .../workflows/nodes/IndexOnESJobNode.java | 14 +- 4 files changed, 149 insertions(+), 9 deletions(-) create mode 100644 dnet-ariadneplus-graphdb-publisher/src/main/resources/eu/dnetlib/ariadneplus/sparql/enrich/ADS/ordered_sparql_insert_398.sparql diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/RunSPARQLQueryService.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/RunSPARQLQueryService.java index 4e39d1a..2a3597a 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/RunSPARQLQueryService.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/RunSPARQLQueryService.java @@ -67,8 +67,11 @@ public class RunSPARQLQueryService { log.info("Start indexing "+ recordIds.size()+ " records ..."); final List errorCodesCount = Arrays.asList(new Integer(0)); final List successCodesCount = Arrays.asList(new Integer(0)); + final List counter = Arrays.asList(new Integer(0)); recordIds.forEach(recordId -> { + log.info(recordId+" >"); int operationResult = executeQueryGraph(selectQueryTemplate, recordId, isCollection); + log.info(" "+operationResult); if (operationResult!=200) { log.error(recordId + " error_code: "+ Integer.toString(operationResult)); int currentErrorsCount = errorCodesCount.get(0).intValue(); @@ -80,7 +83,18 @@ public class RunSPARQLQueryService { currentSuccessCount+=1; successCodesCount.set(0, new Integer(currentSuccessCount)); } + int counterValue = counter.get(0).intValue(); + String curReport = null; + if ((counterValue % 1000) == 0) { + curReport = "Current analyzed records: "+counterValue+" Current indexed records: "+ successCodesCount.get(0).intValue() + + " , " + "Current errors: "+ errorCodesCount.get(0).intValue(); + log.info(curReport); + } + counterValue+=1; + counter.set(0, new Integer(counterValue)); + }); + String report = "Total indexed records: "+ successCodesCount.get(0).intValue() + " , " + "Total errors: "+ errorCodesCount.get(0).intValue(); log.info(report); diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/resources/eu/dnetlib/ariadneplus/sparql/enrich/ADS/ordered_sparql_insert_398.sparql b/dnet-ariadneplus-graphdb-publisher/src/main/resources/eu/dnetlib/ariadneplus/sparql/enrich/ADS/ordered_sparql_insert_398.sparql new file mode 100644 index 0000000..d8ad25f --- /dev/null +++ b/dnet-ariadneplus-graphdb-publisher/src/main/resources/eu/dnetlib/ariadneplus/sparql/enrich/ADS/ordered_sparql_insert_398.sparql @@ -0,0 +1,124 @@ +PREFIX aocat: +PREFIX skos: +PREFIX rdf: +PREFIX rdfs: +INSERT { + GRAPH { + ?collection aocat:has_ARIADNE_subject ?archeologicalResourceType . + ?archeologicalResourceType skos:prefLabel ?archeologicalResourceTypeName . + ?archeologicalResourceType rdfs:label ?archeologicalResourceTypeName . + } +} +WHERE { + ?collection rdf:type . + ?collection aocat:has_ARIADNE_subject ?archeologicalResourceType . + ?archeologicalResourceType skos:prefLabel ?archeologicalResourceTypeName . + ?collection aocat:has_original_id "1000398" . +}; + +PREFIX aocat: +PREFIX skos: +PREFIX ariadneplus: + +INSERT { + GRAPH { + ?record aocat:was_issued ?issued . + ?record aocat:was_modified ?modified . + ?record aocat:has_contributor ?contributor . + ?record aocat:has_responsible ?legalResponsible . + ?record aocat:has_owner ?owner . + ?record aocat:has_publisher ?publisher . + ?record aocat:has_access_rights ?accessRights . + ?record aocat:has_ARIADNE_subject ?archeologicalResourceType . + } +} +USING +WHERE { + ?record aocat:is_part_of ?collection . + ?collection aocat:was_issued ?issued . + ?collection aocat:was_modified ?modified . + ?collection aocat:has_contributor ?contributor . + ?collection aocat:has_responsible ?legalResponsible . + ?collection aocat:has_owner ?owner . + ?collection aocat:has_publisher ?publisher . + ?collection aocat:has_access_rights ?accessRights . + ?collection aocat:has_ARIADNE_subject ?archeologicalResourceType . +}; + +PREFIX skos: + PREFIX aocat: + PREFIX rdf: + INSERT { + GRAPH { + ?s aocat:has_native_subject . + skos:prefLabel "Not provided" . + } + } + WHERE { + GRAPH { + ?s rdf:type aocat:AO_Individual_Data_Resource . + MINUS { + ?s rdf:type aocat:AO_Individual_Data_Resource . + ?s aocat:has_native_subject ?ns . + } + } + }; + +PREFIX aocat: +PREFIX skos: + +INSERT { + GRAPH { + ?record aocat:has_derived_subject ?aat . + } +} +USING +USING +WHERE { + { + ?record aocat:has_native_subject ?native_subject . + ?native_subject skos:exactMatch ?aat . + } + union + { + ?record aocat:has_native_subject ?native_subject . + ?native_subject skos:broadMatch ?aat . + } + union + { + ?record aocat:has_native_subject ?native_subject . + ?native_subject skos:closeMatch ?aat . + } + union + { + ?record aocat:has_native_subject ?native_subject . + ?native_subject skos:narrowMatch ?aat . + } +}; + + PREFIX skos: + PREFIX aocat: + PREFIX time: + INSERT { + GRAPH { + ?temporal aocat:has_period ?periodO . + ?temporal aocat:from ?temporalFrom . + ?temporal aocat:until ?temporalUntil . + } + } + WHERE { + GRAPH { + ?temporal aocat:has_native_period ?native_period . + ?native_period skos:prefLabel ?native_label . + optional { + GRAPH { + ?periodO skos:altLabel ?native_label . + ?periodO skos:inScheme . + ?periodO time:intervalStartedBy ?intervalStartedBy . + ?intervalStartedBy skos:prefLabel ?temporalFrom . + ?periodO time:intervalFinishedBy ?intervalFinishedBy . + ?intervalFinishedBy skos:prefLabel ?temporalUntil . + } + } + } + }; \ No newline at end of file diff --git a/dnet-ariadneplus-graphdb-publisher/test/java/eu/dnetlib/ariadneplus/GraphDbReaderAndESIndexTest.java b/dnet-ariadneplus-graphdb-publisher/test/java/eu/dnetlib/ariadneplus/GraphDbReaderAndESIndexTest.java index 608de5d..f9c6e3b 100644 --- a/dnet-ariadneplus-graphdb-publisher/test/java/eu/dnetlib/ariadneplus/GraphDbReaderAndESIndexTest.java +++ b/dnet-ariadneplus-graphdb-publisher/test/java/eu/dnetlib/ariadneplus/GraphDbReaderAndESIndexTest.java @@ -21,7 +21,7 @@ import java.util.Properties; * @author enrico.ottonello * */ -@Ignore +//@Ignore public class GraphDbReaderAndESIndexTest { private RunSPARQLQueryService runSPQRLQuery; @@ -58,7 +58,7 @@ public class GraphDbReaderAndESIndexTest { final ClassPathResource queryTemplateResource; boolean testRecord = true; if (testRecord) { - recordId = "https://ariadne-infrastructure.eu/aocat/Resource/FE3155A7-AF9F-3C5F-A92E-93041EF495E0"; + recordId = "https://ariadne-infrastructure.eu/aocat/Resource/D200902C-A1C2-346E-8F37-E8A429260ADE"; queryTemplateResource = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_record_data_template.sparql"); } else { @@ -67,7 +67,7 @@ public class GraphDbReaderAndESIndexTest { } String datasource = "ads"; - String collectionId = "270"; + String collectionId = "398"; List recordIds = Arrays.asList(recordId); String queryTemplate = IOUtils.toString(queryTemplateResource.getInputStream(), StandardCharsets.UTF_8.name()); if (testRecord) { diff --git a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/IndexOnESJobNode.java b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/IndexOnESJobNode.java index 74a0a3a..8bdfdd2 100644 --- a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/IndexOnESJobNode.java +++ b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/IndexOnESJobNode.java @@ -16,6 +16,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.NameValuePair; +import org.apache.http.client.HttpClient; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -46,7 +47,7 @@ public class IndexOnESJobNode extends AsyncJobNode { private String datasource; //for parallel requests to the publisher endpoint - private int nThreads = 5; +// private int nThreads = 5; @Override protected String execute(final Env env) throws Exception { @@ -54,10 +55,11 @@ public class IndexOnESJobNode extends AsyncJobNode { int statusCode = -1; String indexOnESResult = "noResult"; log.info("Publisher endpoint: " + getPublisherEndpoint()); - PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); - cm.setMaxTotal(nThreads); - CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build(); - +// PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); +// cm.setMaxTotal(nThreads); +// CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build(); + CloseableHttpClient client = HttpClients.createDefault(); + log.info("IndexOnES endpoint: " + getIndexOnESEndpoint()); CloseableHttpResponse responsePOST = null; try { @@ -95,7 +97,7 @@ public class IndexOnESJobNode extends AsyncJobNode { finally{ if(responsePOST != null) responsePOST.close(); client.close(); - cm.shutdown(); +// cm.shutdown(); } env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode));