diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixFlat.java b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixFlat.java deleted file mode 100644 index fb4dd0259..000000000 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixFlat.java +++ /dev/null @@ -1,158 +0,0 @@ -package eu.dnetlib.dhp.sx.graph.scholix; - -import java.util.List; - -public class ScholixFlat { - private String identifier; - private String relationType; - private String sourceId; - private String sourceType; - private String sourceSubType; - private List sourcePid; - private List sourcePidType; - private List sourcePublisher; - private String targetId; - private String targetType; - private String targetSubType; - private List targetPid; - private List targetPidType; - private List targetPublisher; - private List linkProviders; - private String publicationDate; - private String blob; - - public String getIdentifier() { - return identifier; - } - - public void setIdentifier(String identifier) { - this.identifier = identifier; - } - - public String getRelationType() { - return relationType; - } - - public void setRelationType(String relationType) { - this.relationType = relationType; - } - public String getSourceId() { - return sourceId; - } - - public void setSourceId(String sourceId) { - this.sourceId = sourceId; - } - - public String getSourceType() { - return sourceType; - } - - public void setSourceType(String sourceType) { - this.sourceType = sourceType; - } - - public String getSourceSubType() { - return sourceSubType; - } - - public void setSourceSubType(String sourceSubType) { - this.sourceSubType = sourceSubType; - } - - public List getSourcePid() { - return sourcePid; - } - - public void setSourcePid(List sourcePid) { - this.sourcePid = sourcePid; - } - - public List getSourcePidType() { - return sourcePidType; - } - - public void setSourcePidType(List sourcePidType) { - this.sourcePidType = sourcePidType; - } - - public List getSourcePublisher() { - return sourcePublisher; - } - - public void setSourcePublisher(List sourcePublisher) { - this.sourcePublisher = sourcePublisher; - } - - public String getTargetId() { - return targetId; - } - - public void setTargetId(String targetId) { - this.targetId = targetId; - } - - public String getTargetType() { - return targetType; - } - - public void setTargetType(String targetType) { - this.targetType = targetType; - } - - public String getTargetSubType() { - return targetSubType; - } - - public void setTargetSubType(String targetSubType) { - this.targetSubType = targetSubType; - } - - public List getTargetPid() { - return targetPid; - } - - public void setTargetPid(List targetPid) { - this.targetPid = targetPid; - } - - public List getTargetPidType() { - return targetPidType; - } - - public void setTargetPidType(List targetPidType) { - this.targetPidType = targetPidType; - } - - public List getTargetPublisher() { - return targetPublisher; - } - - public void setTargetPublisher(List targetPublisher) { - this.targetPublisher = targetPublisher; - } - - public List getLinkProviders() { - return linkProviders; - } - - public void setLinkProviders(List linkProviders) { - this.linkProviders = linkProviders; - } - - public String getPublicationDate() { - return publicationDate; - } - - public void setPublicationDate(String publicationDate) { - this.publicationDate = publicationDate; - } - - public String getBlob() { - return blob; - } - - public void setBlob(String blob) { - this.blob = blob; - } -} diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala index 538858ab4..91756416e 100644 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala @@ -1,6 +1,14 @@ package eu.dnetlib.dhp.sx.graph.scholix -import eu.dnetlib.dhp.schema.oaf.{Dataset, OtherResearchProduct, Publication, Relation, Result, Software, StructuredProperty} +import eu.dnetlib.dhp.schema.oaf.{ + Dataset, + OtherResearchProduct, + Publication, + Relation, + Result, + Software, + StructuredProperty +} import eu.dnetlib.dhp.schema.sx.scholix._ import eu.dnetlib.dhp.schema.sx.summary.{AuthorPid, CollectedFromType, SchemeValue, ScholixSummary, Typology} import eu.dnetlib.dhp.utils.DHPUtils @@ -263,14 +271,16 @@ object ScholixUtils extends Serializable { if (summaryObject.getAuthor != null && !summaryObject.getAuthor.isEmpty) { val l: List[ScholixEntityId] = - summaryObject.getAuthor.asScala.map(a => { - if (a.getORCID != null) - new ScholixEntityId( - a.getFullname, - List(new ScholixIdentifier(a.getORCID, "ORCID", s"https://orcid.org/${a.getORCID}")).asJava - ) - else new ScholixEntityId(a.getFullname, null) - }).toList + summaryObject.getAuthor.asScala + .map(a => { + if (a.getORCID != null) + new ScholixEntityId( + a.getFullname, + List(new ScholixIdentifier(a.getORCID, "ORCID", s"https://orcid.org/${a.getORCID}")).asJava + ) + else new ScholixEntityId(a.getFullname, null) + }) + .toList if (l.nonEmpty) r.setCreator(l.asJava) } @@ -416,11 +426,11 @@ object ScholixUtils extends Serializable { return null s.setLocalIdentifier(persistentIdentifiers.asJava) r match { - case _: Publication => s.setTypology(Typology.publication) - case _: Dataset => s.setTypology(Typology.dataset) - case _: Software => s.setTypology(Typology.software) + case _: Publication => s.setTypology(Typology.publication) + case _: Dataset => s.setTypology(Typology.dataset) + case _: Software => s.setTypology(Typology.software) case _: OtherResearchProduct => s.setTypology(Typology.otherresearchproduct) - case _ => + case _ => } s.setSubType(r.getInstance().get(0).getInstancetype.getClassname) diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixFlatTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixFlatTest.java index 0e921d9dd..54c74908e 100644 --- a/dhp-common/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixFlatTest.java +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixFlatTest.java @@ -11,6 +11,7 @@ import org.junit.jupiter.api.Test; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.sx.scholix.Scholix; +import eu.dnetlib.dhp.schema.sx.scholix.ScholixFlat; public class ScholixFlatTest { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/dump/SendToZenodoHDFS.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/dump/SendToZenodoHDFS.java new file mode 100644 index 000000000..000a73395 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/dump/SendToZenodoHDFS.java @@ -0,0 +1,91 @@ + +package eu.dnetlib.dhp.sx.graph.dump; + +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.api.MissingConceptDoiException; +import eu.dnetlib.dhp.common.api.ZenodoAPIClient; + +public class SendToZenodoHDFS implements Serializable { + + private static final String NEW = "new"; // to be used for a brand new deposition in zenodo + private static final String VERSION = "version"; // to be used to upload a new version of a published deposition + private static final String UPDATE = "update"; // to upload content to an open deposition not published + + public static void main(final String[] args) throws Exception, MissingConceptDoiException { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SendToZenodoHDFS.class + .getResourceAsStream( + "/eu/dnetlib/dhp/sx/graph/dump/upload_zenodo.json"))); + + parser.parseArgument(args); + + final String hdfsPath = parser.get("hdfsPath"); + final String hdfsNameNode = parser.get("nameNode"); + final String access_token = parser.get("accessToken"); + final String connection_url = parser.get("connectionUrl"); + final String metadata = parser.get("metadata"); + final String depositionType = parser.get("depositionType"); + final String concept_rec_id = Optional + .ofNullable(parser.get("conceptRecordId")) + .orElse(null); + + final String depositionId = Optional.ofNullable(parser.get("depositionId")).orElse(null); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(hdfsPath), true); + ZenodoAPIClient zenodoApiClient = new ZenodoAPIClient(connection_url, access_token); + switch (depositionType) { + case NEW: + zenodoApiClient.newDeposition(); + break; + case VERSION: + if (concept_rec_id == null) { + throw new MissingConceptDoiException("No concept record id has been provided"); + } + zenodoApiClient.newVersion(concept_rec_id); + break; + case UPDATE: + if (depositionId == null) { + throw new MissingConceptDoiException("No deposition id has been provided"); + } + zenodoApiClient.uploadOpenDeposition(depositionId); + break; + default: + throw new RuntimeException("No available entries"); + } + + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + + Path p = fileStatus.getPath(); + String pString = p.toString(); + if (!pString.endsWith("_SUCCESS")) { + String name = pString.substring(pString.lastIndexOf("/") + 1); + + FSDataInputStream inputStream = fileSystem.open(p); +// zenodoApiClient.uploadIS(inputStream, name); + + } + + } + if (!metadata.equals("")) { + zenodoApiClient.sendMretadata(metadata); + } + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dump/upload_zenodo.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dump/upload_zenodo.json new file mode 100644 index 000000000..a85779f87 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dump/upload_zenodo.json @@ -0,0 +1,50 @@ +[ + { + "paramName": "dt", + "paramLongName": "depositionType", + "paramDescription": "the type of the deposition (new, version, update)", + "paramRequired": true + }, + { + "paramName": "cri", + "paramLongName": "conceptRecordId", + "paramDescription": "The id of the concept record for a new version", + "paramRequired": false + }, + { + "paramName": "di", + "paramLongName": "depositionId", + "paramDescription": "the id of an open deposition which has not been published", + "paramRequired": false + }, + { + "paramName": "hdfsp", + "paramLongName": "hdfsPath", + "paramDescription": "the path of the folder tofind files to send to Zenodo", + "paramRequired": true + }, + { + "paramName": "nn", + "paramLongName": "nameNode", + "paramDescription": "the name node", + "paramRequired": true + }, + { + "paramName": "at", + "paramLongName": "accessToken", + "paramDescription": "the access token for the deposition", + "paramRequired": false + }, + { + "paramName": "cu", + "paramLongName": "connectionUrl", + "paramDescription": "the url to connect to deposit", + "paramRequired": false + }, + { + "paramName": "m", + "paramLongName": "metadata", + "paramDescription": "metadata associated to the deposition", + "paramRequired": false + } +] diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml index d47ebb0be..cdc0ee625 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml @@ -8,148 +8,36 @@ targetPath the final graph path - - relationFilter - Filter relation semantic - - - maxNumberOfPid - filter relation with at least #maxNumberOfPid - - - dumpCitations - false - should dump citation relations - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - yarn - cluster - Import JSONRDD to Dataset kryo - eu.dnetlib.dhp.sx.graph.SparkConvertRDDtoDataset - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=3000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --sourcePath${sourcePath} - --targetPath${targetPath} - --filterRelation${relationFilter} - - - - - - - - - yarn - cluster - Convert Entities to summaries - eu.dnetlib.dhp.sx.graph.SparkCreateSummaryObject - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=20000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --sourcePath${targetPath}/entities - --targetPath${targetPath}/provision/summaries - - - - - - - - yarn - cluster - Generate Scholix Dataset - eu.dnetlib.dhp.sx.graph.SparkCreateScholix - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=30000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --summaryPath${targetPath}/provision/summaries - --targetPath${targetPath}/provision/scholix - --relationPath${targetPath}/relation - --dumpCitations${dumpCitations} - - - - - - - - - - - - - - - - - yarn - cluster - Serialize scholix to JSON - eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=6000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --sourcePath${targetPath}/provision/scholix/scholix - --targetPath${targetPath}/json/scholix_json - --objectTypescholix - --maxPidNumberFiltermaxNumberOfPid - - - - - eu.dnetlib.dhp.common.MakeTarArchive --nameNode${nameNode} + --hdfsPath${targetPath} + --sourcePath${sourcePath} + + + + + + + + eu.dnetlib.dhp.sx.graph.dump.SendToZenodoHDFS --hdfsPath${targetPath}/tar - --sourcePath${targetPath}/json + --nameNode${nameNode} + --accessToken${accessToken} + --connectionUrl${connectionUrl} + --metadata${metadata} + --conceptRecordId${conceptRecordId} + --depositionType${depositionType} + --depositionId${depositionId} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/DropAndCreateESIndex.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/DropAndCreateESIndex.java index e5faccd0f..99462e807 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/DropAndCreateESIndex.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/DropAndCreateESIndex.java @@ -103,7 +103,7 @@ public class DropAndCreateESIndex { Objects .requireNonNull( DropAndCreateESIndex.class - .getResourceAsStream("/eu/dnetlib/dhp/sx/provision/scholix_index.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/sx/provision/scholix_index_flat.json"))); log.info("creating Index SCHOLIX"); final HttpPut put = new HttpPut(String.format(url, ip, index, "scholix")); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnES.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnES.java index dd08215d5..770a25d82 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnES.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnES.java @@ -42,7 +42,8 @@ public class SparkIndexCollectionOnES { .toString( Objects .requireNonNull( - DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/cluster.json"))); + SparkIndexCollectionOnES.class + .getResourceAsStream("/eu/dnetlib/dhp/sx/provision/cluster.json"))); @SuppressWarnings("unchecked") final Map clusterMap = new ObjectMapper().readValue(clusterJson, Map.class); diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml index 61a8eb2f9..15aee794c 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml @@ -14,7 +14,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/scholix_index_flat.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/scholix_index_flat.json new file mode 100644 index 000000000..b1d616abf --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/scholix_index_flat.json @@ -0,0 +1,77 @@ +{ + "mappings": { + "properties": { + "blob": { + "type": "binary", + "index": false + }, + "identifier": { + "type": "keyword" + }, + "linkProviders": { + "type": "keyword" + }, + "publicationDate": { + "type": "date" + }, + "relationType": { + "type": "keyword" + }, + "sourceId": { + "type": "keyword" + }, + "sourcePid": { + "type": "keyword" + }, + "sourcePidType": { + "type": "keyword" + }, + "sourcePublisher": { + "type": "keyword" + }, + "sourceSubType": { + "type": "keyword" + }, + "sourceType": { + "type": "keyword" + }, + "targetId": { + "type": "keyword" + }, + "targetPid": { + "type": "keyword" + }, + "targetPidType": { + "type": "keyword" + }, + "targetPublisher": { + "type": "keyword" + }, + "targetSubType": { + "type": "keyword" + }, + "targetType": { + "type": "keyword" + } + } + }, + "settings": { + "index": { + "refresh_interval": "600s", + "number_of_shards": "48", + "translog": { + "sync_interval": "15s", + "durability": "ASYNC" + }, + "analysis": { + "analyzer": { + "analyzer_keyword": { + "filter": "lowercase", + "tokenizer": "keyword" + } + } + }, + "number_of_replicas": "0" + } + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/sx/FlatIndexTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/sx/FlatIndexTest.java new file mode 100644 index 000000000..55cbb8b1a --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/sx/FlatIndexTest.java @@ -0,0 +1,146 @@ + +package eu.dnetlib.dhp.sx; + +import static eu.dnetlib.dhp.sx.provision.DropAndCreateESIndex.APPLICATION_JSON; +import static eu.dnetlib.dhp.sx.provision.DropAndCreateESIndex.STATUS_CODE_TEXT; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.codec.binary.Base64InputStream; +import org.apache.commons.codec.binary.Base64OutputStream; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.sx.scholix.Scholix; +import eu.dnetlib.dhp.schema.sx.scholix.ScholixFlat; +import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils; +import eu.dnetlib.dhp.sx.provision.DropAndCreateESIndex; +import eu.dnetlib.dhp.sx.provision.SparkIndexCollectionOnES; + +public class FlatIndexTest { + @Test + public void dropAndCreateIndex() { + + Logger log = LoggerFactory.getLogger(getClass().getName()); + + final String url = "http://localhost:9200/dli_scholix"; + + try (CloseableHttpClient client = HttpClients.createDefault()) { + + HttpDelete delete = new HttpDelete(url); + + CloseableHttpResponse response = client.execute(delete); + + log.info("deleting Index SCHOLIX"); + log.info(STATUS_CODE_TEXT, response.getStatusLine()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + try (CloseableHttpClient client = HttpClients.createDefault()) { + + final String scholixConf = IOUtils + .toString( + Objects + .requireNonNull( + DropAndCreateESIndex.class + .getResourceAsStream("/eu/dnetlib/dhp/sx/provision/scholix_index_flat.json"))); + + log.info("creating Index SCHOLIX"); + final HttpPut put = new HttpPut(url); + + final StringEntity entity = new StringEntity(scholixConf); + put.setEntity(entity); + put.setHeader("Accept", APPLICATION_JSON); + put.setHeader("Content-type", APPLICATION_JSON); + + final CloseableHttpResponse response = client.execute(put); + log.info(STATUS_CODE_TEXT, response.getStatusLine()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private String compressString(final String input) { + try { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + Base64OutputStream b64os = new Base64OutputStream(os); + GZIPOutputStream gzip = new GZIPOutputStream(b64os); + gzip.write(input.getBytes(StandardCharsets.UTF_8)); + gzip.close(); + b64os.close(); + return new String(os.toByteArray(), StandardCharsets.UTF_8); + } catch (Throwable t) { + t.printStackTrace(); + return null; + } + } + + private String uncompress(final String compressed) throws Exception { + Base64InputStream bis = new Base64InputStream(new ByteArrayInputStream(compressed.getBytes())); + GZIPInputStream gzip = new GZIPInputStream(bis); + return IOUtils.toString(gzip); + } + + public void testFeedIndex() throws Exception { + + InputStream gzipStream = new GZIPInputStream( + getClass().getResourceAsStream("/eu/dnetlib/dhp/sx/provision/scholix_dump.gz")); + Reader decoder = new InputStreamReader(gzipStream, "UTF-8"); + BufferedReader buffered = new BufferedReader(decoder); + final ObjectMapper mapper = new ObjectMapper(); + GZIPOutputStream gzip = new GZIPOutputStream(Files.newOutputStream(Paths.get("/tmp/scholix_flat.gz"))); + String line = buffered.readLine(); + while (line != null) { + + final Scholix s = mapper.readValue(line, Scholix.class); + final ScholixFlat flat = ScholixUtils.flattenizeScholix(s, compressString(line)); + gzip.write(mapper.writeValueAsString(flat).concat("\n").getBytes(StandardCharsets.UTF_8)); + line = buffered.readLine(); + } + gzip.close(); + + SparkConf conf = new SparkConf() + .setAppName(SparkIndexCollectionOnES.class.getSimpleName()) + .setMaster("local[*]"); + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { + + JavaRDD inputRdd = sc.textFile("/tmp/scholix_flat.gz"); + + Map esCfg = new HashMap<>(); + esCfg.put("es.nodes", "localhost"); + esCfg.put("es.mapping.id", "identifier"); + esCfg.put("es.batch.write.retry.count", "8"); + esCfg.put("es.batch.write.retry.wait", "60s"); + esCfg.put("es.batch.size.entries", "200"); + esCfg.put("es.nodes.wan.only", "true"); + JavaEsSpark.saveJsonToEs(inputRdd, "scholix", esCfg); + } + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/sx/provision/scholix_dump.gz b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/sx/provision/scholix_dump.gz new file mode 100644 index 000000000..157680efe Binary files /dev/null and b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/sx/provision/scholix_dump.gz differ diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/sx/provision/scholix_dump.zip b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/sx/provision/scholix_dump.zip deleted file mode 100644 index e9afa5a55..000000000 Binary files a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/sx/provision/scholix_dump.zip and /dev/null differ