forked from D-Net/dnet-hadoop
first implementation of incremental update of scholix index
This commit is contained in:
parent
62cc257e5c
commit
cd7416ae4c
|
@ -29,7 +29,7 @@ import java.util.List;
|
|||
public class SparkCreateConnectedComponent {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json")));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/sx/dedup/dedup_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
|
|
|
@ -11,7 +11,7 @@ import org.apache.spark.sql.SparkSession;
|
|||
|
||||
public class SparkCreateDedupRecord {
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedupRecord_parameters.json")));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/sx/dedup/dedupRecord_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
|
|
|
@ -29,7 +29,7 @@ import java.util.List;
|
|||
public class SparkCreateSimRels {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json")));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/sx/dedup/dedup_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
|
|
|
@ -23,7 +23,7 @@ public class SparkPropagateRelationsJob {
|
|||
final static String TARGETJSONPATH = "$.target";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkPropagateRelationsJob.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_propagate_relation_parameters.json")));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkPropagateRelationsJob.class.getResourceAsStream("/eu/dnetlib/dhp/sx/dedup/dedup_propagate_relation_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
|
|
|
@ -26,7 +26,7 @@ public class SparkUpdateEntityJob {
|
|||
final static String IDJSONPATH = "$.id";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityJob.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json")));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityJob.class.getResourceAsStream("/eu/dnetlib/dhp/sx/dedup/dedup_delete_by_inference_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
[
|
||||
{
|
||||
"paramName": "mt",
|
||||
"paramLongName": "master",
|
||||
"paramDescription": "should be local or yarn",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "s",
|
||||
"paramLongName": "sourcePath",
|
||||
"paramDescription": "the path of the sequential file to read",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "e",
|
||||
"paramLongName": "entity",
|
||||
"paramDescription": "the type of entity to be deduped",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "c",
|
||||
"paramLongName": "dedupConf",
|
||||
"paramDescription": "dedup configuration to be used",
|
||||
"compressed": true,
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "d",
|
||||
"paramLongName": "dedupPath",
|
||||
"paramDescription": "dedup path to load mergeRelation",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,33 @@
|
|||
[
|
||||
{
|
||||
"paramName": "mt",
|
||||
"paramLongName": "master",
|
||||
"paramDescription": "should be local or yarn",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "s",
|
||||
"paramLongName": "sourcePath",
|
||||
"paramDescription": "the path of the sequential file to read",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "e",
|
||||
"paramLongName": "entity",
|
||||
"paramDescription": "the type of entity to be deduped",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "c",
|
||||
"paramLongName": "dedupConf",
|
||||
"paramDescription": "dedup configuration to be used",
|
||||
"compressed": true,
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "t",
|
||||
"paramLongName": "targetPath",
|
||||
"paramDescription": "target path to save dedup result",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -49,7 +49,7 @@
|
|||
<mode>cluster</mode>
|
||||
<name>Create Similarity Relations</name>
|
||||
<class>eu.dnetlib.dedup.SparkCreateSimRels</class>
|
||||
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
||||
<jar>dhp-dedup-scholexplorer-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory ${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
|
@ -73,7 +73,7 @@
|
|||
<mode>cluster</mode>
|
||||
<name>Create Connected Components</name>
|
||||
<class>eu.dnetlib.dedup.SparkCreateConnectedComponent</class>
|
||||
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
||||
<jar>dhp-dedup-scholexplorer-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory ${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
|
@ -97,7 +97,7 @@
|
|||
<mode>cluster</mode>
|
||||
<name>Create Dedup Record</name>
|
||||
<class>eu.dnetlib.dedup.SparkCreateDedupRecord</class>
|
||||
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
||||
<jar>dhp-dedup-scholexplorer-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory ${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
|
@ -121,7 +121,7 @@
|
|||
<mode>cluster</mode>
|
||||
<name>Propagate Dedup Relations</name>
|
||||
<class>eu.dnetlib.dedup.sx.SparkPropagateRelationsJob</class>
|
||||
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
||||
<jar>dhp-dedup-scholexplorer-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory ${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
|
@ -145,7 +145,7 @@
|
|||
<mode>cluster</mode>
|
||||
<name>Update ${entity} and add DedupRecord</name>
|
||||
<class>eu.dnetlib.dedup.sx.SparkUpdateEntityJob</class>
|
||||
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
||||
<jar>dhp-dedup-scholexplorer-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory ${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
|
|
|
@ -0,0 +1,203 @@
|
|||
package eu.dnetlib.dhp.provision;
|
||||
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import eu.dnetlib.dhp.provision.scholix.*;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
import eu.dnetlib.scholexplorer.relation.RelInfo;
|
||||
import eu.dnetlib.scholexplorer.relation.RelationMapper;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class Datacite2Scholix {
|
||||
|
||||
|
||||
final RelationMapper relationMapper;
|
||||
|
||||
public Datacite2Scholix(RelationMapper relationMapper) {
|
||||
this.relationMapper = relationMapper;
|
||||
}
|
||||
|
||||
|
||||
public List<Scholix> generateScholixFromJson(final String dJson) {
|
||||
|
||||
List<Map<String, String>> relIds = getRelatedIendtifiers(dJson);
|
||||
relIds = relIds!= null ? relIds.stream().filter(m->
|
||||
m.containsKey("relatedIdentifierType") && m.containsKey("relationType" ) && m.containsKey( "relatedIdentifier")
|
||||
).collect(Collectors.toList()) : null;
|
||||
if(relIds== null || relIds.size() ==0 )
|
||||
return null;
|
||||
|
||||
|
||||
|
||||
final String updated = JsonPath.read(dJson,"$.attributes.updated" );
|
||||
ScholixResource resource = generateDataciteScholixResource(dJson);
|
||||
|
||||
return relIds.stream().flatMap(s-> {
|
||||
final List<Scholix> result = generateScholix(resource, s.get("relatedIdentifier"), s.get("relatedIdentifierType"), s.get("relationType"), updated);
|
||||
return result.stream();
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
}
|
||||
|
||||
|
||||
private List<Scholix> generateScholix(ScholixResource source, final String pid, final String pidtype, final String relType, final String updated) {
|
||||
|
||||
|
||||
if ("doi".equalsIgnoreCase(pidtype)) {
|
||||
ScholixResource target = new ScholixResource();
|
||||
target.setIdentifier(Collections.singletonList(new ScholixIdentifier(pid, pidtype)));
|
||||
final RelInfo relInfo = relationMapper.get(relType.toLowerCase());
|
||||
final ScholixRelationship rel = new ScholixRelationship(relInfo.getOriginal(), "datacite", relInfo.getInverse());
|
||||
final ScholixEntityId provider = source.getCollectedFrom().get(0).getProvider();
|
||||
final Scholix s = new Scholix();
|
||||
s.setSource(source);
|
||||
s.setTarget(target);
|
||||
s.setLinkprovider(Collections.singletonList(provider));
|
||||
s.setPublisher(source.getPublisher());
|
||||
s.setRelationship(rel);
|
||||
s.setPublicationDate(updated);
|
||||
return Collections.singletonList(s);
|
||||
} else {
|
||||
final List<Scholix> result = new ArrayList<>();
|
||||
ScholixResource target = new ScholixResource();
|
||||
target.setIdentifier(Collections.singletonList(new ScholixIdentifier(pid, pidtype)));
|
||||
target.setDnetIdentifier(generateId(pid, pidtype, "unknown"));
|
||||
target.setObjectType("unknown");
|
||||
target.setCollectedFrom(generateDataciteCollectedFrom("incomplete"));
|
||||
final RelInfo relInfo = relationMapper.get(relType.toLowerCase());
|
||||
final ScholixRelationship rel = new ScholixRelationship(relInfo.getOriginal(), "datacite", relInfo.getInverse());
|
||||
final ScholixEntityId provider = source.getCollectedFrom().get(0).getProvider();
|
||||
final Scholix s = new Scholix();
|
||||
s.setSource(source);
|
||||
s.setTarget(target);
|
||||
s.setLinkprovider(Collections.singletonList(provider));
|
||||
s.setPublisher(source.getPublisher());
|
||||
s.setRelationship(rel);
|
||||
s.setPublicationDate(updated);
|
||||
s.generateIdentifier();
|
||||
result.add(s);
|
||||
final Scholix s2 = new Scholix();
|
||||
s2.setSource(target);
|
||||
s2.setTarget(source);
|
||||
s2.setLinkprovider(Collections.singletonList(provider));
|
||||
s2.setPublisher(source.getPublisher());
|
||||
s2.setRelationship(new ScholixRelationship(relInfo.getInverse(), "datacite", relInfo.getOriginal()));
|
||||
s2.setPublicationDate(updated);
|
||||
s2.generateIdentifier();
|
||||
result.add(s2);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
public ScholixResource generateDataciteScholixResource(String dJson) {
|
||||
ScholixResource resource = new ScholixResource();
|
||||
String DOI_PATH = "$.attributes.doi";
|
||||
final String doi = JsonPath.read(dJson, DOI_PATH);
|
||||
resource.setIdentifier(Collections.singletonList(new ScholixIdentifier(doi, "doi")));
|
||||
resource.setObjectType(getType(dJson));
|
||||
resource.setDnetIdentifier(generateId(doi, "doi", resource.getObjectType()));
|
||||
resource.setCollectedFrom(generateDataciteCollectedFrom("complete"));
|
||||
final String publisher = JsonPath.read(dJson, "$.attributes.publisher");
|
||||
if (StringUtils.isNotBlank(publisher))
|
||||
resource.setPublisher(Collections.singletonList(new ScholixEntityId(publisher, null)));
|
||||
final String date = getDate(dJson);
|
||||
if (StringUtils.isNotBlank(date))
|
||||
resource.setPublicationDate(date);
|
||||
final String title = getTitle(dJson);
|
||||
if(StringUtils.isNotBlank(title))
|
||||
resource.setTitle(title);
|
||||
resource.setCreator(getCreators(dJson));
|
||||
return resource;
|
||||
}
|
||||
|
||||
private List<ScholixEntityId> getCreators(final String json) {
|
||||
final List<String> creatorName = JsonPath.read(json, "$.attributes.creators[*].name");
|
||||
if (creatorName!= null && creatorName.size() >0) {
|
||||
return creatorName.stream().map(s-> new ScholixEntityId(s, null)).collect(Collectors.toList());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private String getTitle(final String json){
|
||||
final List<String> titles = JsonPath.read(json, "$.attributes.titles[*].title");
|
||||
return titles!= null && titles.size()>0?titles.get(0): null;
|
||||
}
|
||||
|
||||
private String getDate(final String json) {
|
||||
final List<Map<String,String>> dates = JsonPath.read(json,"$.attributes.dates");
|
||||
if(dates!= null && dates.size()>0){
|
||||
|
||||
List<Map<String, String>> issued = dates.stream().filter(s -> "issued".equalsIgnoreCase(s.get("dateType"))).collect(Collectors.toList());
|
||||
if (issued.size()>0)
|
||||
return issued.get(0).get("date");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private List<ScholixCollectedFrom> generateDataciteCollectedFrom(final String completionStatus) {
|
||||
final ScholixEntityId scholixEntityId = new ScholixEntityId("Datasets in Datacite",
|
||||
Collections.singletonList(new ScholixIdentifier("dli_________::datacite", "dnet_identifier")));
|
||||
return Collections.singletonList(
|
||||
new ScholixCollectedFrom(
|
||||
scholixEntityId,"collected", completionStatus));
|
||||
}
|
||||
|
||||
private String getType(final String json) {
|
||||
try {
|
||||
final String bibtext = JsonPath.read(json, "$.attributes.types.bibtex");
|
||||
if ("article".equalsIgnoreCase(bibtext)) {
|
||||
return "publication";
|
||||
}
|
||||
return "dataset";
|
||||
} catch (Throwable e) {
|
||||
return "dataset";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private List<Map<String, String>> getRelatedIendtifiers(final String json) {
|
||||
String REL_IDENTIFIER_PATH = "$.attributes.relatedIdentifiers[*]";
|
||||
List<Map<String, String>> res = JsonPath.read(json, REL_IDENTIFIER_PATH);
|
||||
return res;
|
||||
|
||||
}
|
||||
|
||||
protected String generateId(final String pid, final String pidType, final String entityType) {
|
||||
String type;
|
||||
switch (entityType){
|
||||
case "publication":
|
||||
type = "50|";
|
||||
break;
|
||||
case "dataset":
|
||||
type = "60|";
|
||||
break;
|
||||
case "unknown":
|
||||
type = "70|";
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("unexpected value "+entityType);
|
||||
|
||||
}
|
||||
|
||||
return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim()));
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
package eu.dnetlib.dhp.provision;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import net.minidev.json.JSONArray;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class DataciteClient implements Iterator<String> {
|
||||
|
||||
final static String blobPath = "$.hits.hits[*]._source";
|
||||
final static String scrollIdPath = "$._scroll_id";
|
||||
|
||||
String scrollId;
|
||||
|
||||
List<String> buffer;
|
||||
|
||||
final String esHost;
|
||||
final String esIndex;
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
public DataciteClient(final String esHost, final String esIndex, final long timestamp) throws IOException {
|
||||
|
||||
this.esHost = esHost;
|
||||
this.esIndex = esIndex;
|
||||
final String body =getResponse(String.format("http://%s:9200/%s/_search?scroll=1m", esHost, esIndex), String.format("{\"size\":1000, \"query\":{\"range\":{\"timestamp\":{\"gte\":%d}}}}", timestamp));
|
||||
scrollId= getJPathString(scrollIdPath, body);
|
||||
buffer = getBlobs(body);
|
||||
|
||||
}
|
||||
|
||||
|
||||
public String getResponse(final String url,final String json ) {
|
||||
CloseableHttpClient client = HttpClients.createDefault();
|
||||
try {
|
||||
|
||||
HttpPost httpPost = new HttpPost(url);
|
||||
if (json!= null) {
|
||||
StringEntity entity = new StringEntity(json);
|
||||
httpPost.setEntity(entity);
|
||||
httpPost.setHeader("Accept", "application/json");
|
||||
httpPost.setHeader("Content-type", "application/json");
|
||||
}
|
||||
CloseableHttpResponse response = client.execute(httpPost);
|
||||
|
||||
return IOUtils.toString(response.getEntity().getContent());
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException("Error on executing request ",e);
|
||||
} finally {
|
||||
try {
|
||||
client.close();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Unable to close client ",e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private String getJPathString(final String jsonPath, final String json) {
|
||||
try {
|
||||
Object o = JsonPath.read(json, jsonPath);
|
||||
if (o instanceof String)
|
||||
return (String) o;
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> getBlobs(final String body) {
|
||||
JSONArray array = JsonPath.read(body, blobPath);
|
||||
return array.stream().map(
|
||||
o -> {
|
||||
try {
|
||||
return mapper.writeValueAsString(o);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return (buffer!= null && !buffer.isEmpty());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String next() {
|
||||
final String nextItem = buffer.remove(0);
|
||||
if (buffer.isEmpty()) {
|
||||
final String json_param = String.format("{\"scroll_id\":\"%s\",\"scroll\" : \"1m\"}", scrollId);
|
||||
final String body =getResponse(String.format("http://%s:9200/_search/scroll", esHost), json_param);
|
||||
try {
|
||||
buffer = getBlobs(body);
|
||||
} catch (Throwable e) {
|
||||
System.out.println(body);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
return nextItem;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
package eu.dnetlib.dhp.provision;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class RetrieveUpdateFromDatacite {
|
||||
|
||||
public static void main(String[] args) throws Exception{
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(RetrieveUpdateFromDatacite.class.getResourceAsStream("/eu/dnetlib/dhp/provision/retrieve_update_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final String hdfsuri = parser.get("namenode");
|
||||
Path hdfswritepath = new Path(parser.get("targetPath"));
|
||||
final String timestamp = parser.get("timestamp");
|
||||
|
||||
|
||||
// ====== Init HDFS File System Object
|
||||
Configuration conf = new Configuration();
|
||||
// Set FileSystem URI
|
||||
conf.set("fs.defaultFS", hdfsuri);
|
||||
// Because of Maven
|
||||
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
||||
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
||||
|
||||
FileSystem.get(URI.create(hdfsuri), conf);
|
||||
|
||||
final AtomicInteger counter = new AtomicInteger(0);
|
||||
try (SequenceFile.Writer writer = SequenceFile.createWriter(conf,
|
||||
SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class),
|
||||
SequenceFile.Writer.valueClass(Text.class))) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -44,35 +44,8 @@ public class SparkExtractRelationCount {
|
|||
final String workingDirPath = parser.get("workingDirPath");
|
||||
|
||||
final String relationPath = parser.get("relationPath");
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
DatasetJoiner.startJoin(spark, relationPath,workingDirPath + "/relatedItemCount");
|
||||
|
||||
|
||||
|
||||
|
||||
// sc.textFile(relationPath)
|
||||
// // We start to Filter the relation not deleted by Inference
|
||||
// .filter(ProvisionUtil::isNotDeleted)
|
||||
// // Then we create a PairRDD<String, RelatedItem>
|
||||
// .mapToPair((PairFunction<String, String, RelatedItemInfo>) f
|
||||
// -> new Tuple2<>(DHPUtils.getJPathString(ProvisionUtil.SOURCEJSONPATH, f), ProvisionUtil.getItemType(f, ProvisionUtil.TARGETJSONPATH)))
|
||||
// //We reduce and sum the number of Relations
|
||||
// .reduceByKey((Function2<RelatedItemInfo, RelatedItemInfo, RelatedItemInfo>) (v1, v2) -> {
|
||||
// if (v1 == null && v2 == null)
|
||||
// return new RelatedItemInfo();
|
||||
// return v1 != null ? v1.add(v2) : v2;
|
||||
// })
|
||||
// //Set the source Id in RelatedItem object
|
||||
// .map(k -> k._2().setId(k._1()))
|
||||
// // Convert to JSON and save as TextFile
|
||||
// .map(k -> {
|
||||
// ObjectMapper mapper = new ObjectMapper();
|
||||
// return mapper.writeValueAsString(k);
|
||||
// }).saveAsTextFile(workingDirPath + "/relatedItemCount", GzipCodec.class);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -8,10 +8,6 @@
|
|||
<name>graphPath</name>
|
||||
<description>the graph path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>index</name>
|
||||
<description>index name</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
|
@ -21,17 +17,16 @@
|
|||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
<name>index</name>
|
||||
<description>index name</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>idScholix</name>
|
||||
<description>the </description>
|
||||
<description>the identifier name of the scholix </description>
|
||||
</property>
|
||||
<property>
|
||||
<name>idSummary</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
<description>the identifier name of the summary</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
|
@ -57,7 +52,7 @@
|
|||
<mode>cluster</mode>
|
||||
<name>calculate for each ID the number of related Dataset, publication and Unknown</name>
|
||||
<class>eu.dnetlib.dhp.provision.SparkExtractRelationCount</class>
|
||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--workingDirPath</arg><arg>${workingDirPath}</arg>
|
||||
|
@ -75,7 +70,7 @@
|
|||
<mode>cluster</mode>
|
||||
<name>generate Summary</name>
|
||||
<class>eu.dnetlib.dhp.provision.SparkGenerateSummary</class>
|
||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--workingDirPath</arg><arg>${workingDirPath}</arg>
|
||||
|
@ -93,7 +88,7 @@
|
|||
<mode>cluster</mode>
|
||||
<name>generate Scholix</name>
|
||||
<class>eu.dnetlib.dhp.provision.SparkGenerateScholix</class>
|
||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory 6G --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--workingDirPath</arg><arg>${workingDirPath}</arg>
|
||||
|
@ -111,8 +106,8 @@
|
|||
<mode>cluster</mode>
|
||||
<name>index Summary</name>
|
||||
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
|
||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="64" </spark-opts>
|
||||
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" </spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingDirPath}/summary</arg>
|
||||
<arg>--index</arg><arg>${index}_object</arg>
|
||||
|
@ -131,7 +126,7 @@
|
|||
<mode>cluster</mode>
|
||||
<name>index scholix</name>
|
||||
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
|
||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingDirPath}/scholix_json</arg>
|
|
@ -0,0 +1,109 @@
|
|||
package eu.dnetlib.dhp.provision;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.provision.scholix.Scholix;
|
||||
import eu.dnetlib.scholexplorer.relation.RelationMapper;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
public class DataciteClientTest {
|
||||
|
||||
|
||||
@Test
|
||||
public void dataciteSCholixTest() throws Exception {
|
||||
final String json = IOUtils.toString(getClass().getResourceAsStream("datacite.json"));
|
||||
final RelationMapper mapper = RelationMapper.load();
|
||||
|
||||
Datacite2Scholix ds = new Datacite2Scholix(mapper);
|
||||
final List<Scholix> s = ds.generateScholixFromJson(json);
|
||||
|
||||
|
||||
System.out.println(new ObjectMapper().writeValueAsString(s));
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testClient() throws Exception {
|
||||
DataciteClient client = new DataciteClient("ip-90-147-167-25.ct1.garrservices.it","datacite",1585454082);
|
||||
int i = 0;
|
||||
final RelationMapper mapper = RelationMapper.load();
|
||||
|
||||
Datacite2Scholix ds = new Datacite2Scholix(mapper);
|
||||
BufferedWriter writer = new BufferedWriter(new FileWriter("/Users/sandro/new_s.txt"));
|
||||
|
||||
final ObjectMapper m = new ObjectMapper();
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
while (client.hasNext()){
|
||||
i ++;
|
||||
|
||||
|
||||
final String next = client.next();
|
||||
try {
|
||||
final List<Scholix> res = ds.generateScholixFromJson(next);
|
||||
if (res!= null)
|
||||
res
|
||||
.forEach(
|
||||
s -> {
|
||||
try {
|
||||
|
||||
writer.write(m.writeValueAsString(s));
|
||||
writer.write("\n");
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
);
|
||||
}catch (Throwable t) {
|
||||
System.out.println(next);
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
if(i %1000 == 0) {
|
||||
System.out.println("added "+i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String getResponse(final String url,final String json ) {
|
||||
CloseableHttpClient client = HttpClients.createDefault();
|
||||
try {
|
||||
|
||||
HttpPost httpPost = new HttpPost(url);
|
||||
if (json!= null) {
|
||||
StringEntity entity = new StringEntity(json);
|
||||
httpPost.setEntity(entity);
|
||||
httpPost.setHeader("Accept", "application/json");
|
||||
httpPost.setHeader("Content-type", "application/json");
|
||||
}
|
||||
CloseableHttpResponse response = client.execute(httpPost);
|
||||
|
||||
return IOUtils.toString(response.getEntity().getContent());
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException("Error on executing request ",e);
|
||||
} finally {
|
||||
try {
|
||||
client.close();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Unable to close client ",e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,136 @@
|
|||
{
|
||||
"relationships": {
|
||||
"client": {
|
||||
"data": {
|
||||
"type": "clients",
|
||||
"id": "crossref.citations"
|
||||
}
|
||||
}
|
||||
},
|
||||
"attributes": {
|
||||
"contributors": [
|
||||
],
|
||||
"titles": [
|
||||
{
|
||||
"title": "UV-visible spectroscopy in the interpretation of the tautomeric equilibrium of N,N′(bis-3,5-di-bromo-salicyliden)-1,2-diaminobenzene and the redox activity of its Co(II) complex. A quantum chemical approach."
|
||||
}
|
||||
],
|
||||
"descriptions": [
|
||||
],
|
||||
"referenceCount": 0,
|
||||
"subjects": [
|
||||
],
|
||||
"container": {
|
||||
"title": "Journal of Molecular Structure: THEOCHEM",
|
||||
"firstPage": "97",
|
||||
"volume": "367",
|
||||
"lastPage": "110",
|
||||
"identifierType": "ISSN",
|
||||
"identifier": "0166-1280",
|
||||
"type": "Journal"
|
||||
},
|
||||
"state": "findable",
|
||||
"created": "2020-03-26T13:31:57.000Z",
|
||||
"source": "levriero",
|
||||
"metadataVersion": 0,
|
||||
"version": null,
|
||||
"isActive": true,
|
||||
"contentUrl": null,
|
||||
"geoLocations": [
|
||||
],
|
||||
"updated": "2020-03-26T13:31:58.000Z",
|
||||
"fundingReferences": [
|
||||
],
|
||||
"viewCount": 0,
|
||||
"registered": "2020-03-26T13:31:58.000Z",
|
||||
"published": "1996",
|
||||
"dates": [
|
||||
{
|
||||
"date": "1996-09",
|
||||
"dateType": "Issued"
|
||||
},
|
||||
{
|
||||
"date": "2019-04-17T13:58:25Z",
|
||||
"dateType": "Updated"
|
||||
}
|
||||
],
|
||||
"relatedIdentifiers": [
|
||||
{
|
||||
"relationType": "IsPartOf",
|
||||
"relatedIdentifier": "0166-1280",
|
||||
"relatedIdentifierType": "ISSN",
|
||||
"resourceTypeGeneral": "Collection"
|
||||
}
|
||||
],
|
||||
"reason": null,
|
||||
"rightsList": [
|
||||
{
|
||||
"rightsUri": "https://www.elsevier.com/tdm/userlicense/1.0"
|
||||
}
|
||||
],
|
||||
"schemaVersion": "http://datacite.org/schema/kernel-4",
|
||||
"types": {
|
||||
"resourceType": "JournalArticle",
|
||||
"ris": "JOUR",
|
||||
"resourceTypeGeneral": "Text",
|
||||
"bibtex": "article",
|
||||
"citeproc": "article-journal",
|
||||
"schemaOrg": "ScholarlyArticle"
|
||||
},
|
||||
"publisher": "Elsevier BV",
|
||||
"publicationYear": 1996,
|
||||
"doi": "10.1016/s0166-1280(96)04575-7",
|
||||
"language": null,
|
||||
"sizes": [
|
||||
],
|
||||
"url": "https://linkinghub.elsevier.com/retrieve/pii/S0166128096045757",
|
||||
"identifiers": [
|
||||
{
|
||||
"identifier": "https://doi.org/10.1016/s0166-1280(96)04575-7",
|
||||
"identifierType": "DOI"
|
||||
},
|
||||
{
|
||||
"identifier": "S0166128096045757",
|
||||
"identifierType": "Publisher ID"
|
||||
}
|
||||
],
|
||||
"citationCount": 0,
|
||||
"formats": [
|
||||
],
|
||||
"downloadCount": 0,
|
||||
"creators": [
|
||||
{
|
||||
"nameType": "Personal",
|
||||
"givenName": "G.L.",
|
||||
"name": "Estiú, G.L.",
|
||||
"familyName": "Estiú",
|
||||
"affiliation": [
|
||||
]
|
||||
},
|
||||
{
|
||||
"nameType": "Personal",
|
||||
"givenName": "A.H.",
|
||||
"name": "Jubert, A.H.",
|
||||
"familyName": "Jubert",
|
||||
"affiliation": [
|
||||
]
|
||||
},
|
||||
{
|
||||
"nameType": "Personal",
|
||||
"givenName": "J.",
|
||||
"name": "Costamagna, J.",
|
||||
"familyName": "Costamagna",
|
||||
"affiliation": [
|
||||
]
|
||||
},
|
||||
{
|
||||
"nameType": "Personal",
|
||||
"givenName": "J.",
|
||||
"name": "Vargas, J.",
|
||||
"familyName": "Vargas",
|
||||
"affiliation": [
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue