From ba9f07a6fe1c09214ad1ca673273fa1a54c2daf4 Mon Sep 17 00:00:00 2001 From: sandro Date: Wed, 8 Apr 2020 13:18:20 +0200 Subject: [PATCH 1/2] fixed wrong test --- .../provision/update/CrossRefParserJSON.java | 108 ++++++++++++++++++ .../dhp/provision/update/CrossrefClient.java | 88 ++++++++++++++ .../{ => update}/Datacite2Scholix.java | 52 +++------ .../dhp/provision/update/DataciteClient.java | 75 ++++++++++++ .../DataciteClientIterator.java} | 6 +- .../RetrieveUpdateFromDatacite.java | 35 ++++-- .../update/SparkResolveScholixTarget.java | 81 +++++++++++++ .../input_resolve_scholix_parameters.json | 26 +++++ .../input_retrieve_update_parameters.json | 33 ++++++ .../provision/retrieve_update_parameters.json | 0 .../dhp/sx/provision/oozie_app/workflow.xml | 102 ++++++++--------- .../dhp/provision/DataciteClientTest.java | 76 ++++++------ 12 files changed, 541 insertions(+), 141 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossRefParserJSON.java create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossrefClient.java rename dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/{ => update}/Datacite2Scholix.java (90%) create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClient.java rename dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/{DataciteClient.java => update/DataciteClientIterator.java} (93%) rename dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/{ => update}/RetrieveUpdateFromDatacite.java (52%) create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/SparkResolveScholixTarget.java create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_resolve_scholix_parameters.json create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_retrieve_update_parameters.json delete mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/retrieve_update_parameters.json diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossRefParserJSON.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossRefParserJSON.java new file mode 100644 index 000000000..828d8f9b5 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossRefParserJSON.java @@ -0,0 +1,108 @@ +package eu.dnetlib.dhp.provision.update; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import eu.dnetlib.dhp.provision.scholix.ScholixCollectedFrom; +import eu.dnetlib.dhp.provision.scholix.ScholixEntityId; +import eu.dnetlib.dhp.provision.scholix.ScholixIdentifier; +import eu.dnetlib.dhp.provision.scholix.ScholixResource; +import eu.dnetlib.dhp.utils.DHPUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class CrossRefParserJSON { + + private static List collectedFrom =generateCrossrefCollectedFrom("complete"); + + public static ScholixResource parseRecord(final String record) { + if (record == null) return null; + JsonElement jElement = new JsonParser().parse(record); + JsonElement source = null; + if (jElement.getAsJsonObject().has("_source")) { + source = jElement.getAsJsonObject().get("_source"); + if (source == null || !source.isJsonObject()) + return null; + } + else if(jElement.getAsJsonObject().has("DOI")){ + source = jElement; + } else { + return null; + } + + final JsonObject message = source.getAsJsonObject(); + ScholixResource currentObject = new ScholixResource(); + + if (message.get("DOI") != null) { + final String doi = message.get("DOI").getAsString(); + currentObject.setIdentifier(Collections.singletonList(new ScholixIdentifier(doi, "doi"))); + } + + if ((!message.get("created").isJsonNull()) && (message.getAsJsonObject("created").get("date-time") != null)) { + currentObject.setPublicationDate(message.getAsJsonObject("created").get("date-time").getAsString()); + } + + if (message.get("title")!= null && !message.get("title").isJsonNull() && message.get("title").isJsonArray() ) { + + JsonArray array = message.get("title").getAsJsonArray(); + currentObject.setTitle(array.get(0).getAsString()); + } + if (message.get("author") != null && !message.get("author").isJsonNull()) { + JsonArray author = message.getAsJsonArray("author"); + List authorList = new ArrayList<>(); + for (JsonElement anAuthor : author) { + JsonObject currentAuth = anAuthor.getAsJsonObject(); + + String family = ""; + String given = ""; + if (currentAuth != null && currentAuth.get("family") != null && !currentAuth.get("family").isJsonNull()) { + family = currentAuth.get("family").getAsString(); + } + if (currentAuth != null && currentAuth.get("given") != null && !currentAuth.get("given").isJsonNull()) { + given = currentAuth.get("given").getAsString(); + } + authorList.add(new ScholixEntityId(String.format("%s %s", family, given), null)); + } + currentObject.setCreator(authorList); + } + if (message.get("publisher") != null && !message.get("publisher").isJsonNull()) { + currentObject.setPublisher(Collections.singletonList(new ScholixEntityId(message.get("publisher").getAsString(), null))); + } + currentObject.setCollectedFrom(collectedFrom); + currentObject.setObjectType("publication"); + currentObject.setDnetIdentifier(generateId(message.get("DOI").getAsString(), "doi", "publication")); + + return currentObject; + } + + private static List generateCrossrefCollectedFrom(final String completionStatus) { + final ScholixEntityId scholixEntityId = new ScholixEntityId("Crossref", + Collections.singletonList(new ScholixIdentifier("dli_________::crossref", "dnet_identifier"))); + return Collections.singletonList( + new ScholixCollectedFrom( + scholixEntityId,"resolved", completionStatus)); + } + + private static String generateId(final String pid, final String pidType, final String entityType) { + String type; + switch (entityType){ + case "publication": + type = "50|"; + break; + case "dataset": + type = "60|"; + break; + case "unknown": + type = "70|"; + break; + default: + throw new IllegalArgumentException("unexpected value "+entityType); + } + return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim())); + } + + +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossrefClient.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossrefClient.java new file mode 100644 index 000000000..3190ee516 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossrefClient.java @@ -0,0 +1,88 @@ +package eu.dnetlib.dhp.provision.update; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import eu.dnetlib.dhp.provision.scholix.ScholixResource; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.ByteArrayOutputStream; +import java.util.zip.Inflater; + +public class CrossrefClient { + + private String host; + private String index ="crossref"; + private String indexType = "item"; + + + public CrossrefClient(String host) { + this.host = host; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getIndex() { + return index; + } + + public void setIndex(String index) { + this.index = index; + } + + public String getIndexType() { + return indexType; + } + + public void setIndexType(String indexType) { + this.indexType = indexType; + } + + private static String decompressBlob(final String blob) { + try { + byte[] byteArray = Base64.decodeBase64(blob.getBytes()); + final Inflater decompresser = new Inflater(); + decompresser.setInput(byteArray); + final ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length); + byte[] buffer = new byte[8192]; + while (!decompresser.finished()) { + int size = decompresser.inflate(buffer); + bos.write(buffer, 0, size); + } + byte[] unzippeddata = bos.toByteArray(); + decompresser.end(); + return new String(unzippeddata); + } catch (Throwable e) { + throw new RuntimeException("Wrong record:" + blob,e); + } + } + + + + public ScholixResource getResourceByDOI(final String doi) { + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(String.format("http://%s:9200/%s/%s/%s", host, index,indexType, doi.replaceAll("/","%2F"))); + CloseableHttpResponse response = client.execute(httpGet); + String json = IOUtils.toString(response.getEntity().getContent()); + if (json.contains("blob")) { + JsonParser p = new JsonParser(); + final JsonElement root = p.parse(json); + json =decompressBlob(root.getAsJsonObject().get("_source").getAsJsonObject().get("blob").getAsString()); + } + return CrossRefParserJSON.parseRecord(json); + } catch (Throwable e) { + return null; + } + + } +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/Datacite2Scholix.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java similarity index 90% rename from dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/Datacite2Scholix.java rename to dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java index 809186a50..c6617a823 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/Datacite2Scholix.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.provision; +package eu.dnetlib.dhp.provision.update; import com.jayway.jsonpath.JsonPath; import eu.dnetlib.dhp.provision.scholix.*; @@ -15,16 +15,14 @@ import java.util.stream.Collectors; public class Datacite2Scholix { - + private String rootPath = "$.attributes"; final RelationMapper relationMapper; public Datacite2Scholix(RelationMapper relationMapper) { this.relationMapper = relationMapper; } - public List generateScholixFromJson(final String dJson) { - List> relIds = getRelatedIendtifiers(dJson); relIds = relIds!= null ? relIds.stream().filter(m-> m.containsKey("relatedIdentifierType") && m.containsKey("relationType" ) && m.containsKey( "relatedIdentifier") @@ -32,22 +30,24 @@ public class Datacite2Scholix { if(relIds== null || relIds.size() ==0 ) return null; - - - final String updated = JsonPath.read(dJson,"$.attributes.updated" ); + final String updated = JsonPath.read(dJson, rootPath + ".updated"); ScholixResource resource = generateDataciteScholixResource(dJson); return relIds.stream().flatMap(s-> { final List result = generateScholix(resource, s.get("relatedIdentifier"), s.get("relatedIdentifierType"), s.get("relationType"), updated); return result.stream(); }).collect(Collectors.toList()); - } + public String getRootPath() { + return rootPath; + } + + public void setRootPath(String rootPath) { + this.rootPath = rootPath; + } private List generateScholix(ScholixResource source, final String pid, final String pidtype, final String relType, final String updated) { - - if ("doi".equalsIgnoreCase(pidtype)) { ScholixResource target = new ScholixResource(); target.setIdentifier(Collections.singletonList(new ScholixIdentifier(pid, pidtype))); @@ -92,20 +92,17 @@ public class Datacite2Scholix { result.add(s2); return result; } - - - } public ScholixResource generateDataciteScholixResource(String dJson) { ScholixResource resource = new ScholixResource(); - String DOI_PATH = "$.attributes.doi"; + String DOI_PATH = rootPath + ".doi"; final String doi = JsonPath.read(dJson, DOI_PATH); resource.setIdentifier(Collections.singletonList(new ScholixIdentifier(doi, "doi"))); resource.setObjectType(getType(dJson)); resource.setDnetIdentifier(generateId(doi, "doi", resource.getObjectType())); resource.setCollectedFrom(generateDataciteCollectedFrom("complete")); - final String publisher = JsonPath.read(dJson, "$.attributes.publisher"); + final String publisher = JsonPath.read(dJson, rootPath + ".publisher"); if (StringUtils.isNotBlank(publisher)) resource.setPublisher(Collections.singletonList(new ScholixEntityId(publisher, null))); final String date = getDate(dJson); @@ -119,7 +116,7 @@ public class Datacite2Scholix { } private List getCreators(final String json) { - final List creatorName = JsonPath.read(json, "$.attributes.creators[*].name"); + final List creatorName = JsonPath.read(json, rootPath + ".creators[*].name"); if (creatorName!= null && creatorName.size() >0) { return creatorName.stream().map(s-> new ScholixEntityId(s, null)).collect(Collectors.toList()); } @@ -127,12 +124,12 @@ public class Datacite2Scholix { } private String getTitle(final String json){ - final List titles = JsonPath.read(json, "$.attributes.titles[*].title"); + final List titles = JsonPath.read(json, rootPath + ".titles[*].title"); return titles!= null && titles.size()>0?titles.get(0): null; } private String getDate(final String json) { - final List> dates = JsonPath.read(json,"$.attributes.dates"); + final List> dates = JsonPath.read(json, rootPath + ".dates"); if(dates!= null && dates.size()>0){ List> issued = dates.stream().filter(s -> "issued".equalsIgnoreCase(s.get("dateType"))).collect(Collectors.toList()); @@ -152,7 +149,7 @@ public class Datacite2Scholix { private String getType(final String json) { try { - final String bibtext = JsonPath.read(json, "$.attributes.types.bibtex"); + final String bibtext = JsonPath.read(json, rootPath + ".types.bibtex"); if ("article".equalsIgnoreCase(bibtext)) { return "publication"; } @@ -162,14 +159,10 @@ public class Datacite2Scholix { } } - - - private List> getRelatedIendtifiers(final String json) { - String REL_IDENTIFIER_PATH = "$.attributes.relatedIdentifiers[*]"; + String REL_IDENTIFIER_PATH = rootPath + ".relatedIdentifiers[*]"; List> res = JsonPath.read(json, REL_IDENTIFIER_PATH); return res; - } protected String generateId(final String pid, final String pidType, final String entityType) { @@ -186,18 +179,7 @@ public class Datacite2Scholix { break; default: throw new IllegalArgumentException("unexpected value "+entityType); - } - return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim())); } - - - - - - - - - } diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClient.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClient.java new file mode 100644 index 000000000..e1d25bf2e --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClient.java @@ -0,0 +1,75 @@ +package eu.dnetlib.dhp.provision.update; + +import eu.dnetlib.dhp.provision.scholix.ScholixResource; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.IOException; + +public class DataciteClient { + + + private String host; + private String index ="datacite"; + private String indexType = "dump"; + private Datacite2Scholix d2s; + + public DataciteClient(String host) { + this.host = host; + + d2s = new Datacite2Scholix(null); + d2s.setRootPath("$._source.attributes"); + } + + public Iterable getDatasetsFromTs(final Long timestamp) { + return ()-> { + try { + return new DataciteClientIterator(host, index, timestamp); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + + + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getIndex() { + return index; + } + + public void setIndex(String index) { + this.index = index; + } + + public String getIndexType() { + return indexType; + } + + public void setIndexType(String indexType) { + this.indexType = indexType; + } + + public ScholixResource getDatasetByDOI(final String doi) { + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(String.format("http://%s:9200/%s/%s/%s", host, index,indexType, doi.replaceAll("/","%2F"))); + CloseableHttpResponse response = client.execute(httpGet); + final String json =IOUtils.toString(response.getEntity().getContent()); + return d2s.generateDataciteScholixResource(json); + } catch (Throwable e) { + return null; + } + } + + +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DataciteClient.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClientIterator.java similarity index 93% rename from dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DataciteClient.java rename to dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClientIterator.java index 37a0bb23b..e823945ae 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DataciteClient.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClientIterator.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.provision; +package eu.dnetlib.dhp.provision.update; import com.fasterxml.jackson.databind.ObjectMapper; import com.jayway.jsonpath.JsonPath; import net.minidev.json.JSONArray; @@ -14,7 +14,7 @@ import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; -public class DataciteClient implements Iterator { +public class DataciteClientIterator implements Iterator { final static String blobPath = "$.hits.hits[*]._source"; final static String scrollIdPath = "$._scroll_id"; @@ -27,7 +27,7 @@ public class DataciteClient implements Iterator { final String esIndex; final ObjectMapper mapper = new ObjectMapper(); - public DataciteClient(final String esHost, final String esIndex, final long timestamp) throws IOException { + public DataciteClientIterator(final String esHost, final String esIndex, final long timestamp) throws IOException { this.esHost = esHost; this.esIndex = esIndex; diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/RetrieveUpdateFromDatacite.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/RetrieveUpdateFromDatacite.java similarity index 52% rename from dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/RetrieveUpdateFromDatacite.java rename to dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/RetrieveUpdateFromDatacite.java index ad0034733..ea659dbb1 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/RetrieveUpdateFromDatacite.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/RetrieveUpdateFromDatacite.java @@ -1,8 +1,10 @@ -package eu.dnetlib.dhp.provision; +package eu.dnetlib.dhp.provision.update; +import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.provision.scholix.Scholix; +import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -11,17 +13,19 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import java.net.URI; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class RetrieveUpdateFromDatacite { public static void main(String[] args) throws Exception{ - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(RetrieveUpdateFromDatacite.class.getResourceAsStream("/eu/dnetlib/dhp/provision/retrieve_update_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(RetrieveUpdateFromDatacite.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_retrieve_update_parameters.json"))); parser.parseArgument(args); final String hdfsuri = parser.get("namenode"); Path hdfswritepath = new Path(parser.get("targetPath")); - final String timestamp = parser.get("timestamp"); - + final long timestamp = Long.parseLong(parser.get("timestamp")); + final String host = parser.get("indexHost"); + final String index = parser.get("indexName"); // ====== Init HDFS File System Object Configuration conf = new Configuration(); @@ -32,13 +36,28 @@ public class RetrieveUpdateFromDatacite { conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); FileSystem.get(URI.create(hdfsuri), conf); - - final AtomicInteger counter = new AtomicInteger(0); + final Datacite2Scholix d2s = new Datacite2Scholix(RelationMapper.load()); + final ObjectMapper mapper = new ObjectMapper(); try (SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class))) { + final Text value = new Text(); + final IntWritable key = new IntWritable(); + int i = 0; + for(String dataset: new DataciteClient(host).getDatasetsFromTs(timestamp)) { + i++; + List scholix = d2s.generateScholixFromJson(dataset); + if (scholix!= null) + for(Scholix s: scholix) { + key.set(i); + value.set(mapper.writeValueAsString(s)); + writer.append(key, value); + if (i % 10000 == 0) { + System.out.println("wrote "+i); + } + } + } } } - } diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/SparkResolveScholixTarget.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/SparkResolveScholixTarget.java new file mode 100644 index 000000000..4628c4684 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/SparkResolveScholixTarget.java @@ -0,0 +1,81 @@ +package eu.dnetlib.dhp.provision.update; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.provision.scholix.Scholix; +import eu.dnetlib.dhp.provision.scholix.ScholixIdentifier; +import eu.dnetlib.dhp.provision.scholix.ScholixResource; +import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import scala.Tuple2; + +import java.util.Collections; + +public class SparkResolveScholixTarget { + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResolveScholixTarget.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_resolve_scholix_parameters.json"))); + parser.parseArgument(args); + + final SparkConf conf = new SparkConf(); + + final String master = parser.get("master"); + final String sourcePath = parser.get("sourcePath"); + final String workingDirPath= parser.get("workingDirPath"); + final String indexHost= parser.get("indexHost"); + + + try (SparkSession spark = getSession(conf, master)){ + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + + spark.createDataset(sc.sequenceFile(sourcePath, IntWritable.class,Text.class) + .map(Tuple2::_2) + .map(s-> new ObjectMapper().readValue(s.toString(), Scholix.class)).rdd(), Encoders.bean(Scholix.class)) + .write().save(workingDirPath+"/stepA"); + + + + Dataset s1 = spark.read().load(workingDirPath+"/stepA").as(Encoders.bean(Scholix.class)); + + s1.where(s1.col("target.dnetIdentifier").isNull()).select(s1.col("target.identifier")).distinct() + .map((MapFunction) f-> { + final String pid = ((Row) f.getList(0).get(0)).getString(0); + ScholixResource publication = new CrossrefClient(indexHost).getResourceByDOI(pid); + if (publication != null) { + return publication; + } + ScholixResource dataset = new DataciteClient(indexHost).getDatasetByDOI(pid); + if (dataset!= null) { + return dataset; + } + ScholixResource r = new ScholixResource(); + r.setIdentifier(Collections.singletonList(new ScholixIdentifier(pid, "doi"))); + r.setObjectType("unknown"); + r.setDnetIdentifier("70|"+DHPUtils.md5(String.format("%s::doi", pid.toLowerCase().trim()))); + + return r; + }, Encoders.bean(ScholixResource.class)).write().mode(SaveMode.Overwrite).save(workingDirPath+"/stepB"); + + + + } + } + + private static SparkSession getSession(SparkConf conf, String master) { + return SparkSession + .builder() + .config(conf) + .appName(SparkResolveScholixTarget.class.getSimpleName()) + .master(master) + .getOrCreate(); + } + +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_resolve_scholix_parameters.json b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_resolve_scholix_parameters.json new file mode 100644 index 000000000..e4b6b9dfd --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_resolve_scholix_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "the name node", + "paramRequired": true + }, + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source path", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingDirPath", + "paramDescription": "the working Dir Path", + "paramRequired": true + }, + { + "paramName": "h", + "paramLongName": "indexHost", + "paramDescription": "the working Dir Path", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_retrieve_update_parameters.json b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_retrieve_update_parameters.json new file mode 100644 index 000000000..5c11aca8d --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_retrieve_update_parameters.json @@ -0,0 +1,33 @@ +[ + { + "paramName": "n", + "paramLongName": "namenode", + "paramDescription": "the name node", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "targetPath", + "paramDescription": "the working path where generated files", + "paramRequired": true + }, + { + "paramName": "ts", + "paramLongName": "timestamp", + "paramDescription": "the timestamp for incremental harvesting", + "paramRequired": true + }, + { + "paramName": "ih", + "paramLongName": "indexHost", + "paramDescription": "the ip name of the index", + "paramRequired": true + }, + { + "paramName": "in", + "paramLongName": "indexName", + "paramDescription": "the name of the index", + "paramRequired": true + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/retrieve_update_parameters.json b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/retrieve_update_parameters.json deleted file mode 100644 index e69de29bb..000000000 diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml index abdc69097..68543e3f2 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml @@ -16,21 +16,21 @@ sparkExecutorMemory memory for individual executor - - index - index name - - - idScholix - the identifier name of the scholix - - - idSummary - the identifier name of the summary - + + + + + + + + + + + + - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -98,45 +98,45 @@ - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - index Summary - eu.dnetlib.dhp.provision.SparkIndexCollectionOnES - dhp-graph-provision-scholexplorer-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" - -mt yarn-cluster - --sourcePath${workingDirPath}/summary - --index${index}_object - --idPathid - --typesummary - - - - + + + + + + + + + + + + + + + + + + + - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - index scholix - eu.dnetlib.dhp.provision.SparkIndexCollectionOnES - dhp-graph-provision-scholexplorer-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" - -mt yarn-cluster - --sourcePath${workingDirPath}/scholix_json - --index${index}_scholix - --idPathidentifier - --typescholix - - - - + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java index 780d4a2d6..e008d72be 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java @@ -2,6 +2,8 @@ package eu.dnetlib.dhp.provision; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.provision.scholix.Scholix; +import eu.dnetlib.dhp.provision.scholix.ScholixResource; +import eu.dnetlib.dhp.provision.update.*; import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.io.IOUtils; import org.apache.http.client.methods.CloseableHttpResponse; @@ -9,10 +11,9 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.io.BufferedWriter; -import java.io.FileWriter; import java.io.IOException; import java.util.List; @@ -27,58 +28,45 @@ public class DataciteClientTest { Datacite2Scholix ds = new Datacite2Scholix(mapper); final List s = ds.generateScholixFromJson(json); - - System.out.println(new ObjectMapper().writeValueAsString(s)); - } - @Test + public void testClient() throws Exception { - DataciteClient client = new DataciteClient("ip-90-147-167-25.ct1.garrservices.it","datacite",1585454082); - int i = 0; - final RelationMapper mapper = RelationMapper.load(); + RetrieveUpdateFromDatacite.main(new String[]{ + "-n", "file:///data/new_s2.txt", + "-t", "/data/new_s2.txt", + "-ts", "1585760736", + "-ih", "ip-90-147-167-25.ct1.garrservices.it", + "-in", "datacite", + }); - Datacite2Scholix ds = new Datacite2Scholix(mapper); - BufferedWriter writer = new BufferedWriter(new FileWriter("/Users/sandro/new_s.txt")); - final ObjectMapper m = new ObjectMapper(); + SparkResolveScholixTarget.main(new String[]{ + "-s", "file:///data/new_s.txt", + "-m", "local[*]", + "-w", "/data/scholix/provision", + "-h", "ip-90-147-167-25.ct1.garrservices.it", + + }); + } + + + public void testResolveDataset() throws Exception { + DataciteClient dc = new DataciteClient("ip-90-147-167-25.ct1.garrservices.it"); + ScholixResource datasetByDOI = dc.getDatasetByDOI("10.17182/hepdata.15392.v1/t5"); + Assertions.assertNotNull(datasetByDOI); + System.out.println(new ObjectMapper().writeValueAsString(datasetByDOI)); + + + CrossrefClient cr = new CrossrefClient("ip-90-147-167-25.ct1.garrservices.it"); + ScholixResource crossrefByDOI = cr.getResourceByDOI("10.26850/1678-4618eqj.v35.1.2010.p41-46"); + Assertions.assertNotNull(crossrefByDOI); + System.out.println(new ObjectMapper().writeValueAsString(crossrefByDOI)); - - - while (client.hasNext()){ - i ++; - - - final String next = client.next(); - try { - final List res = ds.generateScholixFromJson(next); - if (res!= null) - res - .forEach( - s -> { - try { - - writer.write(m.writeValueAsString(s)); - writer.write("\n"); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - - ); - }catch (Throwable t) { - System.out.println(next); - throw new RuntimeException(t); - } - if(i %1000 == 0) { - System.out.println("added "+i); - } - } } private String getResponse(final String url,final String json ) { From 47f3d9b7574c927b38d6d729b534d27ae0cad477 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 8 Apr 2020 13:24:43 +0200 Subject: [PATCH 2/2] unit test for GraphHiveImporterJob --- .../dhp/common/SparkSessionSupport.java | 14 +++ ...moteActionPayloadForGraphTableJobTest.java | 30 +----- .../dhp/oa/graph/GraphHiveImporterJob.java | 69 ++++++++++++++ .../dhp/oa/graph/SparkGraphImporterJob.java | 62 ------------- .../{ => hive}/oozie_app/config-default.xml | 0 .../oozie_app/lib/scripts/postprocessing.sql | 0 .../graph/{ => hive}/oozie_app/workflow.xml | 2 +- .../oa/graph/input_graph_hive_parameters.json | 26 ++++++ .../dhp/oa/graph/input_graph_parameters.json | 6 -- .../oa/graph/GraphHiveImporterJobTest.java | 92 +++++++++++++++++++ .../oa/graph/SparkGraphImporterJobTest.java | 54 ----------- 11 files changed, 204 insertions(+), 151 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJob.java rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/{ => hive}/oozie_app/config-default.xml (100%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/{ => hive}/oozie_app/lib/scripts/postprocessing.sql (100%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/{ => hive}/oozie_app/workflow.xml (97%) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_hive_parameters.json delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJobTest.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJobTest.java diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java index f42ee1c58..43c18a956 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java @@ -29,6 +29,20 @@ public class SparkSessionSupport { runWithSparkSession(c -> SparkSession.builder().config(c).getOrCreate(), conf, isSparkSessionManaged, fn); } + /** + * Runs a given function using SparkSession created with hive support and using default builder and supplied SparkConf. + * Stops SparkSession when SparkSession is managed. Allows to reuse SparkSession created externally. + * + * @param conf SparkConf instance + * @param isSparkSessionManaged When true will stop SparkSession + * @param fn Consumer to be applied to constructed SparkSession + */ + public static void runWithSparkHiveSession(SparkConf conf, + Boolean isSparkSessionManaged, + ThrowingConsumer fn) { + runWithSparkSession(c -> SparkSession.builder().config(c).enableHiveSupport().getOrCreate(), conf, isSparkSessionManaged, fn); + } + /** * Runs a given function using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession * when SparkSession is managed. Allows to reuse SparkSession created externally. diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java index 755679903..6f53fbec2 100644 --- a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.actionmanager.promote; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; @@ -45,34 +46,7 @@ public class PromoteActionPayloadForGraphTableJobTest { conf.setAppName(PromoteActionPayloadForGraphTableJobTest.class.getSimpleName()); conf.setMaster("local"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(new Class[]{ - Author.class, - Context.class, - Country.class, - DataInfo.class, - eu.dnetlib.dhp.schema.oaf.Dataset.class, - Datasource.class, - ExternalReference.class, - ExtraInfo.class, - Field.class, - GeoLocation.class, - Instance.class, - Journal.class, - KeyValue.class, - Oaf.class, - OafEntity.class, - OAIProvenance.class, - Organization.class, - OriginDescription.class, - OtherResearchProduct.class, - Project.class, - Publication.class, - Qualifier.class, - Relation.class, - Result.class, - Software.class, - StructuredProperty.class - }); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); spark = SparkSession.builder().config(conf).getOrCreate(); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java new file mode 100644 index 000000000..0270076dd --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java @@ -0,0 +1,69 @@ +package eu.dnetlib.dhp.oa.graph; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class GraphHiveImporterJob { + + private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJob.class); + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(GraphHiveImporterJob.class.getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_graph_hive_parameters.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String hiveMetastoreUris = parser.get("hiveMetastoreUris"); + log.info("hiveMetastoreUris: {}", hiveMetastoreUris); + + String hiveDbName = parser.get("hiveDbName"); + log.info("hiveDbName: {}", hiveDbName); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", hiveMetastoreUris); + + runWithSparkHiveSession(conf, isSparkSessionManaged, + spark -> loadGraphAsHiveDB(spark, inputPath, hiveDbName)); + } + + // protected for testing + private static void loadGraphAsHiveDB(SparkSession spark, String inputPath, String hiveDbName) { + + spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); + spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + // Read the input file and convert it into RDD of serializable object + ModelSupport.oafTypes.forEach((name, clazz) -> spark.createDataset(sc.textFile(inputPath + "/" + name) + .map(s -> new ObjectMapper().readValue(s, clazz)) + .rdd(), Encoders.bean(clazz)) + .write() + .mode(SaveMode.Overwrite) + .saveAsTable(hiveDbName + "." + name)); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJob.java deleted file mode 100644 index 44b534028..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJob.java +++ /dev/null @@ -1,62 +0,0 @@ -package eu.dnetlib.dhp.oa.graph; - -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; - -public class SparkGraphImporterJob { - - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/input_graph_parameters.json"))); - parser.parseArgument(args); - - new SparkGraphImporterJob().run(parser); - } - - private void run(ArgumentApplicationParser parser) { - try(SparkSession spark = getSparkSession(parser)) { - - final String inputPath = parser.get("sourcePath"); - final String hiveDbName = parser.get("hive_db_name"); - - runWith(spark, inputPath, hiveDbName); - } - } - - // protected for testing - protected void runWith(SparkSession spark, String inputPath, String hiveDbName) { - - spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); - spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - // Read the input file and convert it into RDD of serializable object - ModelSupport.oafTypes.forEach((name, clazz) -> spark.createDataset(sc.textFile(inputPath + "/" + name) - .map(s -> new ObjectMapper().readValue(s, clazz)) - .rdd(), Encoders.bean(clazz)) - .write() - .mode(SaveMode.Overwrite) - .saveAsTable(hiveDbName + "." + name)); - } - - private static SparkSession getSparkSession(ArgumentApplicationParser parser) { - SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - return SparkSession - .builder() - .appName(SparkGraphImporterJob.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/lib/scripts/postprocessing.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/lib/scripts/postprocessing.sql rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml similarity index 97% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/workflow.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml index b523ca17a..271c7040f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml @@ -49,7 +49,7 @@ yarn cluster MapGraphAsHiveDB - eu.dnetlib.dhp.oa.graph.SparkGraphImporterJob + eu.dnetlib.dhp.oa.graph.GraphHiveImporterJob dhp-graph-mapper-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_hive_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_hive_parameters.json new file mode 100644 index 000000000..d6c13773a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_hive_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "in", + "paramLongName": "inputPath", + "paramDescription": "the path to the graph data dump to read", + "paramRequired": true + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "db", + "paramLongName": "hiveDbName", + "paramDescription": "the target hive database name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_parameters.json deleted file mode 100644 index 13c7abd51..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_parameters.json +++ /dev/null @@ -1,6 +0,0 @@ -[ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"h", "paramLongName":"hive_metastore_uris","paramDescription": "the hive metastore uris", "paramRequired": true}, - {"paramName":"db", "paramLongName":"hive_db_name", "paramDescription": "the target hive database name", "paramRequired": true} -] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJobTest.java new file mode 100644 index 000000000..29ca46d1d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJobTest.java @@ -0,0 +1,92 @@ +package eu.dnetlib.dhp.oa.graph; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Objects; + +public class GraphHiveImporterJobTest { + + private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJobTest.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final ClassLoader cl = GraphHiveImporterJobTest.class.getClassLoader(); + + public static final String JDBC_DERBY_TEMPLATE = "jdbc:derby:;databaseName=%s/junit_metastore_db;create=true"; + + private static SparkSession spark; + + private static Path workingDir; + + private static String dbName; + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(GraphHiveImporterJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + dbName = RandomStringUtils.randomAlphabetic(5); + log.info("using DB name {}", "test_" + dbName); + + SparkConf conf = new SparkConf(); + conf.setAppName(GraphHiveImporterJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.set("javax.jdo.option.ConnectionURL", String.format(JDBC_DERBY_TEMPLATE, workingDir.resolve("warehouse").toString())); + + spark = SparkSession + .builder() + .appName(GraphHiveImporterJobTest.class.getSimpleName()) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testImportGraphAsHiveDB() throws Exception { + + GraphHiveImporterJob.main(new String[]{ + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-inputPath", getClass().getResource("/eu/dnetlib/dhp/oa/graph/sample").getPath(), + "-hiveMetastoreUris", "", + "-hiveDbName", dbName + }); + + ModelSupport.oafTypes.forEach((name, clazz) -> { + long count = spark.read().table(dbName + "." + name).count(); + int expected = name.equals("relation") ? 100 : 10; + + Assertions.assertEquals(expected, count, String.format("%s should be %s", name, expected)); + }); + + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJobTest.java deleted file mode 100644 index 302cef8d6..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJobTest.java +++ /dev/null @@ -1,54 +0,0 @@ -package eu.dnetlib.dhp.oa.graph; - -import org.apache.spark.SparkConf; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.nio.file.Path; - -public class SparkGraphImporterJobTest { - - private final static String TEST_DB_NAME = "test"; - - @Test - public void testImport(@TempDir Path outPath) { - try(SparkSession spark = testSparkSession(outPath.toString())) { - - new SparkGraphImporterJob().runWith( - spark, - getClass().getResource("/eu/dnetlib/dhp/oa/graph/sample").getPath(), - TEST_DB_NAME); - - GraphMappingUtils.types.forEach((name, clazz) -> { - final long count = spark.read().table(TEST_DB_NAME + "." + name).count(); - if (name.equals("relation")) { - Assertions.assertEquals(100, count, String.format("%s should be 100", name)); - } else { - Assertions.assertEquals(10, count, String.format("%s should be 10", name)); - } - }); - } - } - - private SparkSession testSparkSession(final String inputPath) { - SparkConf conf = new SparkConf(); - - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("hive.metastore.warehouse.dir", inputPath + "/warehouse"); - conf.set("spark.sql.warehouse.dir", inputPath); - conf.set("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s/junit_metastore_db;create=true", inputPath)); - conf.set("spark.ui.enabled", "false"); - - return SparkSession - .builder() - .appName(SparkGraphImporterJobTest.class.getSimpleName()) - .master("local[*]") - .config(conf) - .enableHiveSupport() - .getOrCreate(); - } - -}