forked from D-Net/dnet-hadoop
merge branch with master
This commit is contained in:
commit
f8e9bda24c
|
@ -1,119 +1,117 @@
|
||||||
package eu.dnetlib.dhp.common;
|
|
||||||
|
|
||||||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
package eu.dnetlib.dhp.common;
|
||||||
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
|
|
||||||
import org.apache.hadoop.fs.*;
|
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||||
|
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
|
||||||
|
import org.apache.hadoop.fs.*;
|
||||||
|
|
||||||
public class MakeTarArchive implements Serializable {
|
public class MakeTarArchive implements Serializable {
|
||||||
|
|
||||||
private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException {
|
private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException {
|
||||||
Path hdfsWritePath = new Path(outputPath);
|
Path hdfsWritePath = new Path(outputPath);
|
||||||
FSDataOutputStream fsDataOutputStream = null;
|
FSDataOutputStream fsDataOutputStream = null;
|
||||||
if (fileSystem.exists(hdfsWritePath)) {
|
if (fileSystem.exists(hdfsWritePath)) {
|
||||||
fileSystem.delete(hdfsWritePath, true);
|
fileSystem.delete(hdfsWritePath, true);
|
||||||
|
|
||||||
}
|
}
|
||||||
fsDataOutputStream = fileSystem.create(hdfsWritePath);
|
fsDataOutputStream = fileSystem.create(hdfsWritePath);
|
||||||
|
|
||||||
return new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream());
|
return new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name)
|
private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
Path hdfsWritePath = new Path(outputPath);
|
Path hdfsWritePath = new Path(outputPath);
|
||||||
FSDataOutputStream fsDataOutputStream = null;
|
FSDataOutputStream fsDataOutputStream = null;
|
||||||
if (fileSystem.exists(hdfsWritePath)) {
|
if (fileSystem.exists(hdfsWritePath)) {
|
||||||
fileSystem.delete(hdfsWritePath, true);
|
fileSystem.delete(hdfsWritePath, true);
|
||||||
|
|
||||||
}
|
}
|
||||||
fsDataOutputStream = fileSystem.create(hdfsWritePath);
|
fsDataOutputStream = fileSystem.create(hdfsWritePath);
|
||||||
|
|
||||||
TarArchiveOutputStream ar = new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream());
|
TarArchiveOutputStream ar = new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream());
|
||||||
|
|
||||||
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
|
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
|
||||||
.listFiles(
|
.listFiles(
|
||||||
new Path(inputPath), true);
|
new Path(inputPath), true);
|
||||||
|
|
||||||
while (fileStatusListIterator.hasNext()) {
|
while (fileStatusListIterator.hasNext()) {
|
||||||
writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, 0);
|
writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
ar.close();
|
ar.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void tarMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name,
|
public static void tarMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name,
|
||||||
int gBperSplit) throws IOException {
|
int gBperSplit) throws IOException {
|
||||||
final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit;
|
final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit;
|
||||||
|
|
||||||
long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed();
|
long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed();
|
||||||
|
|
||||||
if (sourceSize < bytesPerSplit) {
|
if (sourceSize < bytesPerSplit) {
|
||||||
write(fileSystem, inputPath, outputPath + ".tar", dir_name);
|
write(fileSystem, inputPath, outputPath + ".tar", dir_name);
|
||||||
} else {
|
} else {
|
||||||
int partNum = 0;
|
int partNum = 0;
|
||||||
|
|
||||||
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
|
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
|
||||||
.listFiles(
|
.listFiles(
|
||||||
new Path(inputPath), true);
|
new Path(inputPath), true);
|
||||||
boolean next = fileStatusListIterator.hasNext();
|
boolean next = fileStatusListIterator.hasNext();
|
||||||
while (next) {
|
while (next) {
|
||||||
TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar");
|
TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar");
|
||||||
|
|
||||||
long current_size = 0;
|
long current_size = 0;
|
||||||
while (next && current_size < bytesPerSplit) {
|
while (next && current_size < bytesPerSplit) {
|
||||||
current_size = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, current_size);
|
current_size = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, current_size);
|
||||||
next = fileStatusListIterator.hasNext();
|
next = fileStatusListIterator.hasNext();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
partNum += 1;
|
partNum += 1;
|
||||||
ar.close();
|
ar.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long writeCurrentFile(FileSystem fileSystem, String dir_name,
|
private static long writeCurrentFile(FileSystem fileSystem, String dir_name,
|
||||||
RemoteIterator<LocatedFileStatus> fileStatusListIterator,
|
RemoteIterator<LocatedFileStatus> fileStatusListIterator,
|
||||||
TarArchiveOutputStream ar, long current_size) throws IOException {
|
TarArchiveOutputStream ar, long current_size) throws IOException {
|
||||||
LocatedFileStatus fileStatus = fileStatusListIterator.next();
|
LocatedFileStatus fileStatus = fileStatusListIterator.next();
|
||||||
|
|
||||||
Path p = fileStatus.getPath();
|
Path p = fileStatus.getPath();
|
||||||
String p_string = p.toString();
|
String p_string = p.toString();
|
||||||
if (!p_string.endsWith("_SUCCESS")) {
|
if (!p_string.endsWith("_SUCCESS")) {
|
||||||
String name = p_string.substring(p_string.lastIndexOf("/") + 1);
|
String name = p_string.substring(p_string.lastIndexOf("/") + 1);
|
||||||
if (name.trim().equalsIgnoreCase("communities_infrastructures")) {
|
if (name.trim().equalsIgnoreCase("communities_infrastructures")) {
|
||||||
name = "communities_infrastructures.json";
|
name = "communities_infrastructures.json";
|
||||||
}
|
}
|
||||||
TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name);
|
TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name);
|
||||||
entry.setSize(fileStatus.getLen());
|
entry.setSize(fileStatus.getLen());
|
||||||
current_size += fileStatus.getLen();
|
current_size += fileStatus.getLen();
|
||||||
ar.putArchiveEntry(entry);
|
ar.putArchiveEntry(entry);
|
||||||
|
|
||||||
InputStream is = fileSystem.open(fileStatus.getPath());
|
InputStream is = fileSystem.open(fileStatus.getPath());
|
||||||
|
|
||||||
BufferedInputStream bis = new BufferedInputStream(is);
|
|
||||||
|
|
||||||
int count;
|
|
||||||
byte data[] = new byte[1024];
|
|
||||||
while ((count = bis.read(data, 0, data.length)) != -1) {
|
|
||||||
ar.write(data, 0, count);
|
|
||||||
}
|
|
||||||
bis.close();
|
|
||||||
ar.closeArchiveEntry();
|
|
||||||
|
|
||||||
}
|
|
||||||
return current_size;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
BufferedInputStream bis = new BufferedInputStream(is);
|
||||||
|
|
||||||
|
int count;
|
||||||
|
byte data[] = new byte[1024];
|
||||||
|
while ((count = bis.read(data, 0, data.length)) != -1) {
|
||||||
|
ar.write(data, 0, count);
|
||||||
|
}
|
||||||
|
bis.close();
|
||||||
|
ar.closeArchiveEntry();
|
||||||
|
|
||||||
|
}
|
||||||
|
return current_size;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,6 @@ import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.Disabled;
|
import org.junit.jupiter.api.Disabled;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
|
||||||
@Disabled
|
@Disabled
|
||||||
public class HttpConnectorTest {
|
public class HttpConnectorTest {
|
||||||
|
|
||||||
|
|
|
@ -109,20 +109,20 @@ public class CleaningFunctions {
|
||||||
}
|
}
|
||||||
if (Objects.nonNull(r.getPid())) {
|
if (Objects.nonNull(r.getPid())) {
|
||||||
r
|
r
|
||||||
.setPid(
|
.setPid(
|
||||||
r
|
r
|
||||||
.getPid()
|
.getPid()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(Objects::nonNull)
|
.filter(Objects::nonNull)
|
||||||
.filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue())))
|
.filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue())))
|
||||||
.filter(sp -> NONE.equalsIgnoreCase(sp.getValue()))
|
.filter(sp -> NONE.equalsIgnoreCase(sp.getValue()))
|
||||||
.filter(sp -> Objects.nonNull(sp.getQualifier()))
|
.filter(sp -> Objects.nonNull(sp.getQualifier()))
|
||||||
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
|
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
|
||||||
.map(sp -> {
|
.map(sp -> {
|
||||||
sp.setValue(StringUtils.trim(sp.getValue()));
|
sp.setValue(StringUtils.trim(sp.getValue()));
|
||||||
return sp;
|
return sp;
|
||||||
})
|
})
|
||||||
.collect(Collectors.toList()));
|
.collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) {
|
if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) {
|
||||||
r
|
r
|
||||||
|
|
|
@ -1,18 +1,18 @@
|
||||||
<workflow-app name="dump_community_products" xmlns="uri:oozie:workflow:0.5">
|
<workflow-app name="dump_community_products" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
|
||||||
<parameters>
|
<parameters>
|
||||||
<property>
|
<property>
|
||||||
<name>sourcePath</name>
|
<name>sourcePath</name>
|
||||||
<description>the source path</description>
|
<description>the source path</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>isLookUpUrl</name>
|
<name>isLookUpUrl</name>
|
||||||
<description>the isLookup service endpoint</description>
|
<description>the isLookup service endpoint</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>outputPath</name>
|
<name>outputPath</name>
|
||||||
<description>the output path</description>
|
<description>the output path</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>accessToken</name>
|
<name>accessToken</name>
|
||||||
<description>the access token used for the deposition in Zenodo</description>
|
<description>the access token used for the deposition in Zenodo</description>
|
||||||
|
@ -320,6 +320,7 @@
|
||||||
<ok to="join_extend"/>
|
<ok to="join_extend"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<action name="extend_orp">
|
<action name="extend_orp">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
|
@ -344,6 +345,7 @@
|
||||||
<ok to="join_extend"/>
|
<ok to="join_extend"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<action name="extend_software">
|
<action name="extend_software">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
|
@ -371,43 +373,42 @@
|
||||||
|
|
||||||
<join name="join_extend" to="splitForCommunities"/>
|
<join name="join_extend" to="splitForCommunities"/>
|
||||||
|
|
||||||
<action name="splitForCommunities">
|
<action name="splitForCommunities">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Split dumped result for community</name>
|
<name>Split dumped result for community</name>
|
||||||
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkSplitForCommunity</class>
|
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkSplitForCommunity</class>
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory=${sparkExecutorMemory}
|
--executor-memory=${sparkExecutorMemory}
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--sourcePath</arg><arg>${workingDir}/ext</arg>
|
<arg>--sourcePath</arg><arg>${workingDir}/ext</arg>
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/split</arg>
|
<arg>--outputPath</arg><arg>${workingDir}/split</arg>
|
||||||
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
|
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="make_archive"/>
|
<ok to="make_archive"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<action name="make_archive">
|
<action name="make_archive">
|
||||||
<java>
|
<java>
|
||||||
<main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class>
|
<main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class>
|
||||||
<arg>--hdfsPath</arg><arg>${outputPath}</arg>
|
<arg>--hdfsPath</arg><arg>${outputPath}</arg>
|
||||||
<arg>--nameNode</arg><arg>${nameNode}</arg>
|
<arg>--nameNode</arg><arg>${nameNode}</arg>
|
||||||
<arg>--sourcePath</arg><arg>${workingDir}/split</arg>
|
<arg>--sourcePath</arg><arg>${workingDir}/split</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="send_zenodo"/>
|
<ok to="send_zenodo"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
||||||
<action name="send_zenodo">
|
<action name="send_zenodo">
|
||||||
<java>
|
<java>
|
||||||
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS</main-class>
|
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS</main-class>
|
||||||
|
@ -424,8 +425,6 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
<end name="End"/>
|
<end name="End"/>
|
||||||
|
|
||||||
</workflow-app>
|
</workflow-app>
|
|
@ -1,18 +1,18 @@
|
||||||
<workflow-app name="dump_whole_graph" xmlns="uri:oozie:workflow:0.5">
|
<workflow-app name="dump_whole_graph" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
|
||||||
<parameters>
|
<parameters>
|
||||||
<property>
|
<property>
|
||||||
<name>sourcePath</name>
|
<name>sourcePath</name>
|
||||||
<description>the source path</description>
|
<description>the source path</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>isLookUpUrl</name>
|
<name>isLookUpUrl</name>
|
||||||
<description>the isLookup service endpoint</description>
|
<description>the isLookup service endpoint</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>outputPath</name>
|
<name>outputPath</name>
|
||||||
<description>the output path</description>
|
<description>the output path</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>resultAggregation</name>
|
<name>resultAggregation</name>
|
||||||
<description>true if all the result type have to be dumped under result. false otherwise</description>
|
<description>true if all the result type have to be dumped under result. false otherwise</description>
|
||||||
|
@ -357,10 +357,8 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
||||||
<join name="join_dump" to="fork_context"/>
|
<join name="join_dump" to="fork_context"/>
|
||||||
|
|
||||||
|
|
||||||
<fork name="fork_context">
|
<fork name="fork_context">
|
||||||
<path start="create_entities_fromcontext"/>
|
<path start="create_entities_fromcontext"/>
|
||||||
<path start="create_relation_fromcontext"/>
|
<path start="create_relation_fromcontext"/>
|
||||||
|
@ -389,7 +387,6 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
||||||
<action name="create_relation_fromorgs">
|
<action name="create_relation_fromorgs">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
|
@ -418,7 +415,6 @@
|
||||||
|
|
||||||
<join name="join_context" to="fork_extract_relations"/>
|
<join name="join_context" to="fork_extract_relations"/>
|
||||||
|
|
||||||
|
|
||||||
<fork name="fork_extract_relations">
|
<fork name="fork_extract_relations">
|
||||||
<path start="rels_from_pubs"/>
|
<path start="rels_from_pubs"/>
|
||||||
<path start="rels_from_dats"/>
|
<path start="rels_from_dats"/>
|
||||||
|
@ -530,7 +526,6 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
||||||
<join name="join_extract_relations" to="collect_and_save"/>
|
<join name="join_extract_relations" to="collect_and_save"/>
|
||||||
|
|
||||||
<action name="collect_and_save">
|
<action name="collect_and_save">
|
||||||
|
@ -568,8 +563,7 @@
|
||||||
<ok to="send_zenodo"/>
|
<ok to="send_zenodo"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
||||||
<action name="send_zenodo">
|
<action name="send_zenodo">
|
||||||
<java>
|
<java>
|
||||||
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS</main-class>
|
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS</main-class>
|
||||||
|
|
|
@ -6,11 +6,36 @@ import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation
|
import eu.dnetlib.dhp.schema.oaf.Relation
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
import org.apache.spark.SparkConf
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.sql.expressions.Aggregator
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||||
|
|
||||||
object SparkGenerateScholixIndex {
|
object SparkGenerateScholixIndex {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def getScholixAggregator(): Aggregator[(String, Scholix), Scholix, Scholix] = new Aggregator[(String, Scholix), Scholix, Scholix]{
|
||||||
|
|
||||||
|
override def zero: Scholix = new Scholix()
|
||||||
|
|
||||||
|
override def reduce(b: Scholix, a: (String, Scholix)): Scholix = {
|
||||||
|
b.mergeFrom(a._2)
|
||||||
|
b
|
||||||
|
}
|
||||||
|
|
||||||
|
override def merge(wx: Scholix, wy: Scholix): Scholix = {
|
||||||
|
wx.mergeFrom(wy)
|
||||||
|
wx
|
||||||
|
}
|
||||||
|
override def finish(reduction: Scholix): Scholix = reduction
|
||||||
|
|
||||||
|
override def bufferEncoder: Encoder[Scholix] =
|
||||||
|
Encoders.kryo(classOf[Scholix])
|
||||||
|
|
||||||
|
override def outputEncoder: Encoder[Scholix] =
|
||||||
|
Encoders.kryo(classOf[Scholix])
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkGenerateScholixIndex.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json")))
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkGenerateScholixIndex.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json")))
|
||||||
parser.parseArgument(args)
|
parser.parseArgument(args)
|
||||||
|
@ -40,7 +65,7 @@ object SparkGenerateScholixIndex {
|
||||||
|
|
||||||
(relation.getTarget, Scholix.generateScholixWithSource(summary,relation))
|
(relation.getTarget, Scholix.generateScholixWithSource(summary,relation))
|
||||||
|
|
||||||
}).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix_source")
|
}).repartition(6000).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix_source")
|
||||||
|
|
||||||
val sTarget:Dataset[(String,Scholix)] = spark.read.load(s"$workingDirPath/scholix_source").as[(String, Scholix)]
|
val sTarget:Dataset[(String,Scholix)] = spark.read.load(s"$workingDirPath/scholix_source").as[(String, Scholix)]
|
||||||
|
|
||||||
|
@ -53,9 +78,16 @@ object SparkGenerateScholixIndex {
|
||||||
scholix.generateIdentifier()
|
scholix.generateIdentifier()
|
||||||
scholix.generatelinkPublisher()
|
scholix.generatelinkPublisher()
|
||||||
scholix
|
scholix
|
||||||
}).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix")
|
}).repartition(6000).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix_r")
|
||||||
|
|
||||||
|
|
||||||
|
val finalScholix:Dataset[Scholix] = spark.read.load(s"$workingDirPath/scholix_r").as[Scholix]
|
||||||
|
|
||||||
|
finalScholix.map(d => (d.getIdentifier, d))(Encoders.tuple(Encoders.STRING, scholixEncoder))
|
||||||
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
|
.agg(getScholixAggregator().toColumn)
|
||||||
|
.map(p => p._2)
|
||||||
|
.write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,8 @@ import java.io.Serializable;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary;
|
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary;
|
||||||
|
@ -91,13 +93,91 @@ public class Scholix implements Serializable {
|
||||||
s.setSource(ScholixResource.fromSummary(scholixSummary));
|
s.setSource(ScholixResource.fromSummary(scholixSummary));
|
||||||
|
|
||||||
s.setIdentifier(rel.getTarget());
|
s.setIdentifier(rel.getTarget());
|
||||||
// ScholixResource mockTarget = new ScholixResource();
|
|
||||||
// mockTarget.setDnetIdentifier(rel.getTarget());
|
|
||||||
// s.setTarget(mockTarget);
|
|
||||||
// s.generateIdentifier();
|
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<ScholixEntityId> mergeScholixEntityId(final List<ScholixEntityId> a, final List<ScholixEntityId> b) {
|
||||||
|
final List<ScholixEntityId> m = new ArrayList<>(a);
|
||||||
|
if (b != null)
|
||||||
|
b.forEach(s -> {
|
||||||
|
int tt = (int) m.stream().filter(t -> t.getName().equalsIgnoreCase(s.getName())).count();
|
||||||
|
if (tt == 0) {
|
||||||
|
m.add(s);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<ScholixIdentifier> mergeScholixIdnetifier(final List<ScholixIdentifier> a,
|
||||||
|
final List<ScholixIdentifier> b) {
|
||||||
|
final List<ScholixIdentifier> m = new ArrayList<>(a);
|
||||||
|
if (b != null)
|
||||||
|
b.forEach(s -> {
|
||||||
|
int tt = (int) m.stream().filter(t -> t.getIdentifier().equalsIgnoreCase(s.getIdentifier())).count();
|
||||||
|
if (tt == 0) {
|
||||||
|
m.add(s);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<ScholixCollectedFrom> mergeScholixCollectedFrom(final List<ScholixCollectedFrom> a,
|
||||||
|
final List<ScholixCollectedFrom> b) {
|
||||||
|
final List<ScholixCollectedFrom> m = new ArrayList<>(a);
|
||||||
|
if (b != null)
|
||||||
|
b.forEach(s -> {
|
||||||
|
int tt = (int) m
|
||||||
|
.stream()
|
||||||
|
.filter(t -> t.getProvider().getName().equalsIgnoreCase(s.getProvider().getName()))
|
||||||
|
.count();
|
||||||
|
if (tt == 0) {
|
||||||
|
m.add(s);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ScholixRelationship mergeRelationships(final ScholixRelationship a, final ScholixRelationship b) {
|
||||||
|
ScholixRelationship result = new ScholixRelationship();
|
||||||
|
result.setName(StringUtils.isEmpty(a.getName()) ? b.getName() : a.getName());
|
||||||
|
result.setInverse(StringUtils.isEmpty(a.getInverse()) ? b.getInverse() : a.getInverse());
|
||||||
|
result.setSchema(StringUtils.isEmpty(a.getSchema()) ? b.getSchema() : a.getSchema());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ScholixResource mergeResource(final ScholixResource a, final ScholixResource b) {
|
||||||
|
|
||||||
|
final ScholixResource result = new ScholixResource();
|
||||||
|
result.setCollectedFrom(mergeScholixCollectedFrom(a.getCollectedFrom(), b.getCollectedFrom()));
|
||||||
|
result.setCreator(mergeScholixEntityId(a.getCreator(), b.getCreator()));
|
||||||
|
result
|
||||||
|
.setDnetIdentifier(
|
||||||
|
StringUtils.isBlank(a.getDnetIdentifier()) ? b.getDnetIdentifier() : a.getDnetIdentifier());
|
||||||
|
result.setIdentifier(mergeScholixIdnetifier(a.getIdentifier(), b.getIdentifier()));
|
||||||
|
result.setObjectType(StringUtils.isNotBlank(a.getObjectType()) ? a.getObjectType() : b.getObjectType());
|
||||||
|
result
|
||||||
|
.setObjectSubType(
|
||||||
|
StringUtils.isNotBlank(a.getObjectSubType()) ? a.getObjectSubType() : b.getObjectSubType());
|
||||||
|
result.setPublisher(mergeScholixEntityId(a.getPublisher(), b.getPublisher()));
|
||||||
|
result
|
||||||
|
.setPublicationDate(
|
||||||
|
StringUtils.isNotBlank(a.getPublicationDate()) ? a.getPublicationDate() : b.getPublicationDate());
|
||||||
|
result.setTitle(StringUtils.isNotBlank(a.getTitle()) ? a.getTitle() : b.getTitle());
|
||||||
|
return result;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void mergeFrom(final Scholix other) {
|
||||||
|
linkprovider = mergeScholixEntityId(linkprovider, other.getLinkprovider());
|
||||||
|
publisher = mergeScholixEntityId(publisher, other.getPublisher());
|
||||||
|
if (StringUtils.isEmpty(publicationDate))
|
||||||
|
publicationDate = other.getPublicationDate();
|
||||||
|
relationship = mergeRelationships(relationship, other.getRelationship());
|
||||||
|
source = mergeResource(source, other.getSource());
|
||||||
|
target = mergeResource(target, other.getTarget());
|
||||||
|
generateIdentifier();
|
||||||
|
}
|
||||||
|
|
||||||
public void generatelinkPublisher() {
|
public void generatelinkPublisher() {
|
||||||
Set<String> publisher = new HashSet<>();
|
Set<String> publisher = new HashSet<>();
|
||||||
if (source.getPublisher() != null)
|
if (source.getPublisher() != null)
|
||||||
|
|
|
@ -108,7 +108,7 @@
|
||||||
<arg>-m</arg> <arg>yarn-cluster</arg>
|
<arg>-m</arg> <arg>yarn-cluster</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDirPath}</arg>
|
<arg>--workingPath</arg><arg>${workingDirPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="DropAndCreateIndex"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue