Compare commits
27 Commits
75551ad4ec
...
c8a88b2187
Author | SHA1 | Date |
---|---|---|
Miriam Baglioni | c8a88b2187 | |
Claudio Atzori | 26b97aa5ed | |
Lampros Smyrnaios | b7c8acc563 | |
Michele Artini | 71d6e02886 | |
Michele Artini | 02c9a311c8 | |
Miriam Baglioni | 42846d3b91 | |
Miriam Baglioni | 4f0a044245 | |
Miriam Baglioni | 4bb504e693 | |
Serafeim Chatzopoulos | cbe13a5c61 | |
Miriam Baglioni | 9c9a9562ae | |
Miriam Baglioni | 2c4440951f | |
Miriam Baglioni | b42bdd5fb3 | |
Miriam Baglioni | 64cbd8abe9 | |
Antonis Lempesis | df6e3bda04 | |
Antonis Lempesis | 573b081f1d | |
Serafeim Chatzopoulos | 0eb0701b26 | |
Antonis Lempesis | 0bf2a7a359 | |
Claudio Atzori | 24227ab598 | |
Antonis Lempesis | 9ff44eed96 | |
Claudio Atzori | cff6040424 | |
Antonis Lempesis | 1fee4124e0 | |
Claudio Atzori | 9e700a8b0d | |
Lampros Smyrnaios | 036ba03fcd | |
Miriam Baglioni | a418dacb47 | |
Miriam Baglioni | e9131f4e4a | |
Miriam Baglioni | 4c9bc4c3a5 | |
Miriam Baglioni | 55ea485783 |
|
@ -67,6 +67,9 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
final String openapcInputPath = parser.get("openapcInputPath");
|
final String openapcInputPath = parser.get("openapcInputPath");
|
||||||
log.info("openapcInputPath: {}", openapcInputPath);
|
log.info("openapcInputPath: {}", openapcInputPath);
|
||||||
|
|
||||||
|
final String dataciteInputPath = parser.get("dataciteInputPath");
|
||||||
|
log.info("dataciteInputPath: {}", dataciteInputPath);
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
final String outputPath = parser.get("outputPath");
|
||||||
log.info("outputPath: {}", outputPath);
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
@ -93,9 +96,15 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelations(
|
JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelations(
|
||||||
spark, openapcInputPath, collectedFromOpenAPC);
|
spark, openapcInputPath, collectedFromOpenAPC);
|
||||||
|
|
||||||
|
List<KeyValue> collectedFromDatacite = OafMapperUtils
|
||||||
|
.listKeyValues(ModelConstants.DATACITE_ID, "Datacite");
|
||||||
|
JavaPairRDD<Text, Text> dataciteRelations = prepareAffiliationRelations(
|
||||||
|
spark, dataciteInputPath, collectedFromDatacite);
|
||||||
|
|
||||||
crossrefRelations
|
crossrefRelations
|
||||||
.union(pubmedRelations)
|
.union(pubmedRelations)
|
||||||
.union(openAPCRelations)
|
.union(openAPCRelations)
|
||||||
|
.union(dataciteRelations)
|
||||||
.saveAsHadoopFile(
|
.saveAsHadoopFile(
|
||||||
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
|
||||||
|
|
|
@ -88,7 +88,7 @@ public class CreateActionSetSparkJob implements Serializable {
|
||||||
private static void extractContent(SparkSession spark, String inputPath, String outputPath) {
|
private static void extractContent(SparkSession spark, String inputPath, String outputPath) {
|
||||||
|
|
||||||
getTextTextJavaPairRDD(spark, inputPath)
|
getTextTextJavaPairRDD(spark, inputPath)
|
||||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);// , GzipCodec.class);
|
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(SparkSession spark, String inputPath) {
|
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(SparkSession spark, String inputPath) {
|
||||||
|
|
|
@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.actionmanager.Constants.*;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
@ -13,7 +14,9 @@ import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SaveMode;
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
@ -68,18 +71,59 @@ public class SparkAtomicActionUsageJob implements Serializable {
|
||||||
|
|
||||||
final String workingPath = parser.get("workingPath");
|
final String workingPath = parser.get("workingPath");
|
||||||
|
|
||||||
|
final String datasourcePath = parser.get("datasourcePath");
|
||||||
|
|
||||||
runWithSparkHiveSession(
|
runWithSparkHiveSession(
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
removeOutputDir(spark, outputPath);
|
removeOutputDir(spark, outputPath);
|
||||||
prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id");
|
prepareResultData(
|
||||||
|
dbname, spark, workingPath + "/usageDb",
|
||||||
|
"usage_stats",
|
||||||
|
"result_id",
|
||||||
|
"repository_id",
|
||||||
|
datasourcePath);
|
||||||
prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id");
|
prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id");
|
||||||
prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repository_id");
|
prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repository_id");
|
||||||
writeActionSet(spark, workingPath, outputPath);
|
writeActionSet(spark, workingPath, outputPath);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName,
|
||||||
|
String resultAttributeName, String datasourceAttributeName,
|
||||||
|
String datasourcePath) {
|
||||||
|
Dataset<UsageStatsResultModel> resultModel = spark
|
||||||
|
.sql(
|
||||||
|
String
|
||||||
|
.format(
|
||||||
|
"select %s as id, %s as datasourceId, sum(downloads) as downloads, sum(views) as views " +
|
||||||
|
"from %s.%s group by %s, %s",
|
||||||
|
resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName,
|
||||||
|
datasourceAttributeName))
|
||||||
|
.as(Encoders.bean(UsageStatsResultModel.class));
|
||||||
|
Dataset<Datasource> datasource = readPath(spark, datasourcePath, Datasource.class)
|
||||||
|
.filter((FilterFunction<Datasource>) d -> !d.getDataInfo().getDeletedbyinference())
|
||||||
|
.map((MapFunction<Datasource, Datasource>) d -> {
|
||||||
|
d.setId(d.getId().substring(3));
|
||||||
|
return d;
|
||||||
|
}, Encoders.bean(Datasource.class));
|
||||||
|
resultModel
|
||||||
|
.joinWith(datasource, resultModel.col("datasourceId").equalTo(datasource.col("id")), "left")
|
||||||
|
.map((MapFunction<Tuple2<UsageStatsResultModel, Datasource>, UsageStatsResultModel>) t2 -> {
|
||||||
|
UsageStatsResultModel usrm = t2._1();
|
||||||
|
if(Optional.ofNullable(t2._2()).isPresent())
|
||||||
|
usrm.setDatasourceId(usrm.getDatasourceId() + "||" + t2._2().getOfficialname().getValue());
|
||||||
|
else
|
||||||
|
usrm.setDatasourceId(usrm.getDatasourceId() + "||NO_MATCH_FOUND");
|
||||||
|
return usrm;
|
||||||
|
}, Encoders.bean(UsageStatsResultModel.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingPath);
|
||||||
|
}
|
||||||
|
|
||||||
private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName,
|
private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName,
|
||||||
String attribute_name) {
|
String attribute_name) {
|
||||||
spark
|
spark
|
||||||
|
@ -115,15 +159,62 @@ public class SparkAtomicActionUsageJob implements Serializable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Measure newMeasureInstance(String id) {
|
||||||
|
Measure m = new Measure();
|
||||||
|
m.setId(id);
|
||||||
|
m.setUnit(new ArrayList<>());
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
private static Dataset<Result> getFinalIndicatorsResult(SparkSession spark, String inputPath) {
|
private static Dataset<Result> getFinalIndicatorsResult(SparkSession spark, String inputPath) {
|
||||||
|
|
||||||
return readPath(spark, inputPath, UsageStatsModel.class)
|
return readPath(spark, inputPath, UsageStatsResultModel.class)
|
||||||
.map((MapFunction<UsageStatsModel, Result>) usm -> {
|
.groupByKey((MapFunction<UsageStatsResultModel, String>) usm -> usm.getId(), Encoders.STRING())
|
||||||
|
.mapGroups((MapGroupsFunction<String, UsageStatsResultModel, Result>) (k, it) -> {
|
||||||
Result r = new Result();
|
Result r = new Result();
|
||||||
r.setId("50|" + usm.getId());
|
r.setId("50|" + k);
|
||||||
r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews()));
|
// id = download or view and unit = list of key value pairs
|
||||||
|
Measure download = newMeasureInstance("downloads");
|
||||||
|
Measure view = newMeasureInstance("views");
|
||||||
|
UsageStatsResultModel first = it.next();
|
||||||
|
addCountForDatasource(download, first, view);
|
||||||
|
it.forEachRemaining(usm -> {
|
||||||
|
addCountForDatasource(download, usm, view);
|
||||||
|
});
|
||||||
|
r.setMeasures(Arrays.asList(download, view));
|
||||||
return r;
|
return r;
|
||||||
}, Encoders.bean(Result.class));
|
}, Encoders.bean(Result.class))
|
||||||
|
// .map((MapFunction<UsageStatsResultModel, Result>) usm -> {
|
||||||
|
// Result r = new Result();
|
||||||
|
// r.setId("50|" + usm.getId());
|
||||||
|
// r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews()));
|
||||||
|
// return r;
|
||||||
|
// }, Encoders.bean(Result.class));
|
||||||
|
;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void addCountForDatasource(Measure download, UsageStatsResultModel usm, Measure view) {
|
||||||
|
DataInfo dataInfo = OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false,
|
||||||
|
UPDATE_DATA_INFO_TYPE,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID,
|
||||||
|
UPDATE_CLASS_NAME,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"");
|
||||||
|
download
|
||||||
|
.getUnit()
|
||||||
|
.add(
|
||||||
|
OafMapperUtils
|
||||||
|
.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getDownloads()), dataInfo));
|
||||||
|
view
|
||||||
|
.getUnit()
|
||||||
|
.add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getViews()), dataInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Dataset<Project> getFinalIndicatorsProject(SparkSession spark, String inputPath) {
|
private static Dataset<Project> getFinalIndicatorsProject(SparkSession spark, String inputPath) {
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.usagestats;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 30/06/23
|
||||||
|
*/
|
||||||
|
public class UsageStatsResultModel extends UsageStatsModel {
|
||||||
|
private String datasourceId;
|
||||||
|
|
||||||
|
public String getDatasourceId() {
|
||||||
|
return datasourceId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDatasourceId(String datasourceId) {
|
||||||
|
this.datasourceId = datasourceId;
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,6 +23,12 @@
|
||||||
"paramDescription": "the path to get the input data from OpenAPC",
|
"paramDescription": "the path to get the input data from OpenAPC",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"paramName": "dip",
|
||||||
|
"paramLongName": "dataciteInputPath",
|
||||||
|
"paramDescription": "the path to get the input data from Datacite",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"paramName": "o",
|
"paramName": "o",
|
||||||
"paramLongName": "outputPath",
|
"paramLongName": "outputPath",
|
||||||
|
|
|
@ -34,4 +34,6 @@ oozie.wf.application.path=${oozieTopWfApplicationPath}
|
||||||
crossrefInputPath=/data/bip-affiliations/crossref-data.json
|
crossrefInputPath=/data/bip-affiliations/crossref-data.json
|
||||||
pubmedInputPath=/data/bip-affiliations/pubmed-data.json
|
pubmedInputPath=/data/bip-affiliations/pubmed-data.json
|
||||||
openapcInputPath=/data/bip-affiliations/openapc-data.json
|
openapcInputPath=/data/bip-affiliations/openapc-data.json
|
||||||
|
dataciteInputPath=/data/bip-affiliations/datacite-data.json
|
||||||
|
|
||||||
outputPath=/tmp/crossref-affiliations-output-v5
|
outputPath=/tmp/crossref-affiliations-output-v5
|
||||||
|
|
|
@ -13,6 +13,10 @@
|
||||||
<name>openapcInputPath</name>
|
<name>openapcInputPath</name>
|
||||||
<description>the path where to find the inferred affiliation relations from OpenAPC</description>
|
<description>the path where to find the inferred affiliation relations from OpenAPC</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>dataciteInputPath</name>
|
||||||
|
<description>the path where to find the inferred affiliation relations from Datacite</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>outputPath</name>
|
<name>outputPath</name>
|
||||||
<description>the path where to store the actionset</description>
|
<description>the path where to store the actionset</description>
|
||||||
|
@ -107,6 +111,8 @@
|
||||||
<arg>--crossrefInputPath</arg><arg>${crossrefInputPath}</arg>
|
<arg>--crossrefInputPath</arg><arg>${crossrefInputPath}</arg>
|
||||||
<arg>--pubmedInputPath</arg><arg>${pubmedInputPath}</arg>
|
<arg>--pubmedInputPath</arg><arg>${pubmedInputPath}</arg>
|
||||||
<arg>--openapcInputPath</arg><arg>${openapcInputPath}</arg>
|
<arg>--openapcInputPath</arg><arg>${openapcInputPath}</arg>
|
||||||
|
<arg>--dataciteInputPath</arg><arg>${dataciteInputPath}</arg>
|
||||||
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
|
|
|
@ -28,5 +28,11 @@
|
||||||
"paramLongName": "workingPath",
|
"paramLongName": "workingPath",
|
||||||
"paramDescription": "the workingPath where to save the content of the usage_stats table",
|
"paramDescription": "the workingPath where to save the content of the usage_stats table",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "dp",
|
||||||
|
"paramLongName": "datasourcePath",
|
||||||
|
"paramDescription": "the workingPath where to save the content of the usage_stats table",
|
||||||
|
"paramRequired": true
|
||||||
}
|
}
|
||||||
]
|
]
|
|
@ -90,6 +90,7 @@
|
||||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
<arg>--usagestatsdb</arg><arg>${usagestatsdb}</arg>
|
<arg>--usagestatsdb</arg><arg>${usagestatsdb}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}</arg>
|
<arg>--workingPath</arg><arg>${workingDir}</arg>
|
||||||
|
<arg>--datasourcePath</arg><arg>${datasourcePath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -390,6 +390,18 @@ base_dc:link (I used dc:identifier)
|
||||||
</xsl:choose>
|
</xsl:choose>
|
||||||
</oaf:relation>
|
</oaf:relation>
|
||||||
</xsl:for-each>
|
</xsl:for-each>
|
||||||
|
|
||||||
|
<oaf:datainfo>
|
||||||
|
<oaf:inferred>false</oaf:inferred>
|
||||||
|
<oaf:deletedbyinference>false</oaf:deletedbyinference>
|
||||||
|
<oaf:trust>0.89</oaf:trust>
|
||||||
|
<oaf:inferenceprovenance/>
|
||||||
|
<oaf:provenanceaction classid="sysimport:crosswalk:aggregator"
|
||||||
|
classname="sysimport:crosswalk:aggregator"
|
||||||
|
schemeid="dnet:provenanceActions"
|
||||||
|
schemename="dnet:provenanceActions"/>
|
||||||
|
</oaf:datainfo>
|
||||||
|
|
||||||
</metadata>
|
</metadata>
|
||||||
<xsl:copy-of select="//*[local-name() = 'about']" />
|
<xsl:copy-of select="//*[local-name() = 'about']" />
|
||||||
</record>
|
</record>
|
||||||
|
|
|
@ -429,6 +429,17 @@
|
||||||
</xsl:choose>
|
</xsl:choose>
|
||||||
</oaf:relation>
|
</oaf:relation>
|
||||||
</xsl:for-each>
|
</xsl:for-each>
|
||||||
|
|
||||||
|
<oaf:datainfo>
|
||||||
|
<oaf:inferred>false</oaf:inferred>
|
||||||
|
<oaf:deletedbyinference>false</oaf:deletedbyinference>
|
||||||
|
<oaf:trust>0.89</oaf:trust>
|
||||||
|
<oaf:inferenceprovenance/>
|
||||||
|
<oaf:provenanceaction classid="sysimport:crosswalk:aggregator"
|
||||||
|
classname="sysimport:crosswalk:aggregator"
|
||||||
|
schemeid="dnet:provenanceActions"
|
||||||
|
schemename="dnet:provenanceActions"/>
|
||||||
|
</oaf:datainfo>
|
||||||
</metadata>
|
</metadata>
|
||||||
<xsl:copy-of select="//*[local-name() = 'about']" />
|
<xsl:copy-of select="//*[local-name() = 'about']" />
|
||||||
</record>
|
</record>
|
||||||
|
|
|
@ -1048,5 +1048,11 @@
|
||||||
"openaire_id": "re3data_____::r3d100010399",
|
"openaire_id": "re3data_____::r3d100010399",
|
||||||
"datacite_name": "ZEW Forschungsdatenzentrum",
|
"datacite_name": "ZEW Forschungsdatenzentrum",
|
||||||
"official_name": "ZEW Forschungsdatenzentrum"
|
"official_name": "ZEW Forschungsdatenzentrum"
|
||||||
|
},
|
||||||
|
"HBP.NEUROINF": {
|
||||||
|
"openaire_id": "fairsharing_::2975",
|
||||||
|
"datacite_name": "EBRAINS",
|
||||||
|
"official_name": "EBRAINS"
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -87,6 +87,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
"-crossrefInputPath", crossrefAffiliationRelationPath,
|
"-crossrefInputPath", crossrefAffiliationRelationPath,
|
||||||
"-pubmedInputPath", crossrefAffiliationRelationPath,
|
"-pubmedInputPath", crossrefAffiliationRelationPath,
|
||||||
"-openapcInputPath", crossrefAffiliationRelationPath,
|
"-openapcInputPath", crossrefAffiliationRelationPath,
|
||||||
|
"-dataciteInputPath", crossrefAffiliationRelationPath,
|
||||||
"-outputPath", outputPath
|
"-outputPath", outputPath
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -103,7 +104,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
// );
|
// );
|
||||||
// }
|
// }
|
||||||
// count the number of relations
|
// count the number of relations
|
||||||
assertEquals(60, tmp.count());
|
assertEquals(80, tmp.count());
|
||||||
|
|
||||||
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
|
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
|
||||||
dataset.createOrReplaceTempView("result");
|
dataset.createOrReplaceTempView("result");
|
||||||
|
@ -114,7 +115,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
// verify that we have equal number of bi-directional relations
|
// verify that we have equal number of bi-directional relations
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
30, execVerification
|
40, execVerification
|
||||||
.filter(
|
.filter(
|
||||||
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
|
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
|
||||||
.collectAsList()
|
.collectAsList()
|
||||||
|
@ -122,7 +123,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
30, execVerification
|
40, execVerification
|
||||||
.filter(
|
.filter(
|
||||||
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
|
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
|
||||||
.collectAsList()
|
.collectAsList()
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Measure;
|
||||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
|
||||||
|
@ -66,10 +67,380 @@ public class SparkAtomicActionCountJobTest {
|
||||||
spark.stop();
|
spark.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testUsageStatsDb2() {
|
||||||
|
String usageScoresPath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test2")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<AtomicAction> tmp = sc
|
||||||
|
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
|
||||||
|
.map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class));
|
||||||
|
// .map(aa -> (Result) aa.getPayload());
|
||||||
|
|
||||||
|
Assertions.assertEquals(7, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count());
|
||||||
|
Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count());
|
||||||
|
Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count());
|
||||||
|
|
||||||
|
tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity) r.getPayload()).getMeasures().size()));
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> ((OafEntity) r.getPayload())
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference()))));
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> ((OafEntity) r.getPayload())
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred()))));
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> ((OafEntity) r.getPayload())
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible()))));
|
||||||
|
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> ((OafEntity) r.getPayload())
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
u -> Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"measure:usage_counts",
|
||||||
|
u.getDataInfo().getProvenanceaction().getClassid()))));
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> ((OafEntity) r.getPayload())
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
u -> Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"Inferred by OpenAIRE",
|
||||||
|
u.getDataInfo().getProvenanceaction().getClassname()))));
|
||||||
|
|
||||||
|
tmp
|
||||||
|
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|"))
|
||||||
|
.foreach(
|
||||||
|
r -> ((OafEntity) r.getPayload())
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
u -> Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"count",
|
||||||
|
u.getKey()))));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1,
|
||||||
|
tmp
|
||||||
|
.filter(
|
||||||
|
r -> ((OafEntity) r.getPayload())
|
||||||
|
.getId()
|
||||||
|
.equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
OafEntity entity = (OafEntity) tmp
|
||||||
|
.filter(
|
||||||
|
aa -> ((OafEntity) aa.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
|
||||||
|
.first()
|
||||||
|
.getPayload();
|
||||||
|
|
||||||
|
entity
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> Assertions.assertEquals(3, m.getUnit().size()));
|
||||||
|
|
||||||
|
Measure downloads = entity
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("downloads"))
|
||||||
|
.findFirst()
|
||||||
|
.get();
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
String.valueOf(0),
|
||||||
|
downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
String.valueOf(0),
|
||||||
|
downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
String.valueOf(1),
|
||||||
|
downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue());
|
||||||
|
|
||||||
|
Measure views = entity
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("views"))
|
||||||
|
.findFirst()
|
||||||
|
.get();
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
String.valueOf(5),
|
||||||
|
views.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
String.valueOf(1),
|
||||||
|
views.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
String.valueOf(3),
|
||||||
|
views.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue());
|
||||||
|
|
||||||
|
tmp
|
||||||
|
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|"))
|
||||||
|
.foreach(
|
||||||
|
r -> ((OafEntity) r.getPayload())
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
u -> Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"count",
|
||||||
|
u.getKey()))));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"0",
|
||||||
|
tmp
|
||||||
|
.map(r -> ((OafEntity) r.getPayload()))
|
||||||
|
.filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("downloads"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"5",
|
||||||
|
tmp
|
||||||
|
.map(r -> ((OafEntity) r.getPayload()))
|
||||||
|
.filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("views"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"0",
|
||||||
|
tmp
|
||||||
|
.map(r -> ((OafEntity) r.getPayload()))
|
||||||
|
.filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("downloads"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"1",
|
||||||
|
tmp
|
||||||
|
.map(r -> ((OafEntity) r.getPayload()))
|
||||||
|
.filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("views"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"2",
|
||||||
|
tmp
|
||||||
|
.map(r -> ((OafEntity) r.getPayload()))
|
||||||
|
.filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("downloads"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"6",
|
||||||
|
tmp
|
||||||
|
.map(r -> ((OafEntity) r.getPayload()))
|
||||||
|
.filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("views"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"0",
|
||||||
|
tmp
|
||||||
|
.map(r -> ((OafEntity) r.getPayload()))
|
||||||
|
.filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("downloads"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"5",
|
||||||
|
tmp
|
||||||
|
.map(r -> ((OafEntity) r.getPayload()))
|
||||||
|
.filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("views"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"0",
|
||||||
|
tmp
|
||||||
|
.map(r -> ((OafEntity) r.getPayload()))
|
||||||
|
.filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("downloads"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"1",
|
||||||
|
tmp
|
||||||
|
.map(r -> ((OafEntity) r.getPayload()))
|
||||||
|
.filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("views"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"2",
|
||||||
|
tmp
|
||||||
|
.map(r -> ((OafEntity) r.getPayload()))
|
||||||
|
.filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("downloads"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"6",
|
||||||
|
tmp
|
||||||
|
.map(r -> ((OafEntity) r.getPayload()))
|
||||||
|
.filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("views"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testMatch() {
|
void testMatch() {
|
||||||
String usageScoresPath = getClass()
|
String usageScoresPath = getClass()
|
||||||
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats")
|
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test1")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
|
SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
|
||||||
|
@ -144,6 +515,39 @@ public class SparkAtomicActionCountJobTest {
|
||||||
u.getDataInfo().getProvenanceaction().getClassname()))));
|
u.getDataInfo().getProvenanceaction().getClassname()))));
|
||||||
|
|
||||||
tmp
|
tmp
|
||||||
|
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|"))
|
||||||
|
.foreach(
|
||||||
|
r -> ((OafEntity) r.getPayload())
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
u -> Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"count",
|
||||||
|
u.getKey()))));
|
||||||
|
|
||||||
|
tmp
|
||||||
|
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|"))
|
||||||
|
.foreach(
|
||||||
|
r -> ((OafEntity) r.getPayload())
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
u -> Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"10|fake1",
|
||||||
|
u.getKey()))));
|
||||||
|
|
||||||
|
tmp
|
||||||
|
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|"))
|
||||||
.foreach(
|
.foreach(
|
||||||
r -> ((OafEntity) r.getPayload())
|
r -> ((OafEntity) r.getPayload())
|
||||||
.getMeasures()
|
.getMeasures()
|
||||||
|
@ -465,5 +869,4 @@ public class SparkAtomicActionCountJobTest {
|
||||||
.get(0)
|
.get(0)
|
||||||
.getValue());
|
.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake1","downloads":0,"views":5}
|
||||||
|
{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","datasourceId":"10|fake1","downloads":0,"views":1}
|
||||||
|
{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","datasourceId":"10|fake1","downloads":1,"views":3}
|
||||||
|
{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","datasourceId":"10|fake1","downloads":1,"views":1}
|
||||||
|
{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","datasourceId":"10|fake1","downloads":2,"views":6}
|
||||||
|
{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","datasourceId":"10|fake1","downloads":0,"views":1}
|
||||||
|
{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","datasourceId":"10|fake1","downloads":1,"views":3}
|
||||||
|
{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","datasourceId":"10|fake1","downloads":1,"views":10}
|
||||||
|
{"id":"doi_________::4938a71a884dd481d329657aa543b850","datasourceId":"10|fake1","downloads":0,"views":3}
|
|
@ -0,0 +1,9 @@
|
||||||
|
{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5}
|
||||||
|
{"id":"d11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
|
||||||
|
{"id":"d11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
|
||||||
|
{"id":"d11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
|
||||||
|
{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6}
|
||||||
|
{"id":"d12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
|
||||||
|
{"id":"d12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
|
||||||
|
{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10}
|
||||||
|
{"id":"d13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}
|
|
@ -0,0 +1,9 @@
|
||||||
|
{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5}
|
||||||
|
{"id":"f11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
|
||||||
|
{"id":"f11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
|
||||||
|
{"id":"f11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
|
||||||
|
{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6}
|
||||||
|
{"id":"f12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
|
||||||
|
{"id":"f12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
|
||||||
|
{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10}
|
||||||
|
{"id":"f13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}
|
|
@ -0,0 +1,9 @@
|
||||||
|
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake1","downloads":0,"views":5}
|
||||||
|
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake2","downloads":0,"views":1}
|
||||||
|
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake3","downloads":1,"views":3}
|
||||||
|
{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","datasourceId":"10|fake1","downloads":1,"views":1}
|
||||||
|
{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","datasourceId":"10|fake1","downloads":2,"views":6}
|
||||||
|
{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","datasourceId":"10|fake1","downloads":0,"views":1}
|
||||||
|
{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","datasourceId":"10|fake1","downloads":1,"views":3}
|
||||||
|
{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","datasourceId":"10|fake1","downloads":1,"views":10}
|
||||||
|
{"id":"doi_________::4938a71a884dd481d329657aa543b850","datasourceId":"10|fake1","downloads":0,"views":3}
|
|
@ -1,9 +0,0 @@
|
||||||
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5}
|
|
||||||
{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
|
|
||||||
{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
|
|
||||||
{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
|
|
||||||
{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6}
|
|
||||||
{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
|
|
||||||
{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
|
|
||||||
{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10}
|
|
||||||
{"id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}
|
|
|
@ -9,15 +9,27 @@ fi
|
||||||
export HADOOP_USER_NAME=$2
|
export HADOOP_USER_NAME=$2
|
||||||
|
|
||||||
IMPALA_HDFS_NODE=''
|
IMPALA_HDFS_NODE=''
|
||||||
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu >/dev/null 2>&1; then
|
COUNTER=0
|
||||||
|
|
||||||
|
while [ $COUNTER -lt 3 ]; do
|
||||||
|
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
|
||||||
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
||||||
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu >/dev/null 2>&1; then
|
break
|
||||||
|
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu/tmp >/dev/null 2>&1; then
|
||||||
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
|
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
|
||||||
else
|
break
|
||||||
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER!\n\n"
|
else
|
||||||
|
IMPALA_HDFS_NODE=''
|
||||||
|
sleep 1
|
||||||
|
fi
|
||||||
|
((COUNTER++))
|
||||||
|
done
|
||||||
|
|
||||||
|
if [ -z "$IMPALA_HDFS_NODE" ]; then
|
||||||
|
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! $COUNTER\n\n"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE}"
|
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries."
|
||||||
|
|
||||||
|
|
||||||
function copydb() {
|
function copydb() {
|
||||||
|
|
|
@ -9,15 +9,28 @@ fi
|
||||||
export HADOOP_USER_NAME=$2
|
export HADOOP_USER_NAME=$2
|
||||||
|
|
||||||
IMPALA_HDFS_NODE=''
|
IMPALA_HDFS_NODE=''
|
||||||
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu >/dev/null 2>&1; then
|
COUNTER=0
|
||||||
|
|
||||||
|
while [ $COUNTER -lt 3 ]; do
|
||||||
|
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
|
||||||
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
||||||
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu >/dev/null 2>&1; then
|
break
|
||||||
|
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu/tmp >/dev/null 2>&1; then
|
||||||
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
|
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
|
||||||
else
|
break
|
||||||
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER!\n\n"
|
else
|
||||||
|
IMPALA_HDFS_NODE=''
|
||||||
|
sleep 1
|
||||||
|
fi
|
||||||
|
((COUNTER++))
|
||||||
|
done
|
||||||
|
|
||||||
|
if [ -z "$IMPALA_HDFS_NODE" ]; then
|
||||||
|
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! $COUNTER\n\n"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE}"
|
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries."
|
||||||
|
|
||||||
|
|
||||||
function copydb() {
|
function copydb() {
|
||||||
|
|
||||||
|
|
|
@ -9,15 +9,28 @@ fi
|
||||||
#export HADOOP_USER_NAME=$2
|
#export HADOOP_USER_NAME=$2
|
||||||
|
|
||||||
IMPALA_HDFS_NODE=''
|
IMPALA_HDFS_NODE=''
|
||||||
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu >/dev/null 2>&1; then
|
COUNTER=0
|
||||||
|
|
||||||
|
while [ $COUNTER -lt 3 ]; do
|
||||||
|
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
|
||||||
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
||||||
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu >/dev/null 2>&1; then
|
break
|
||||||
|
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu/tmp >/dev/null 2>&1; then
|
||||||
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
|
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
|
||||||
else
|
break
|
||||||
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER!\n\n"
|
else
|
||||||
|
IMPALA_HDFS_NODE=''
|
||||||
|
sleep 1
|
||||||
|
fi
|
||||||
|
((COUNTER++))
|
||||||
|
done
|
||||||
|
|
||||||
|
if [ -z "$IMPALA_HDFS_NODE" ]; then
|
||||||
|
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! $COUNTER\n\n"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE}"
|
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries."
|
||||||
|
|
||||||
|
|
||||||
function copydb() {
|
function copydb() {
|
||||||
|
|
||||||
|
|
|
@ -35,12 +35,20 @@ export HADOOP_USER="oozie"
|
||||||
export HADOOP_USER_NAME="oozie"
|
export HADOOP_USER_NAME="oozie"
|
||||||
|
|
||||||
echo "Creating and populating impala tables"
|
echo "Creating and populating impala tables"
|
||||||
hive $HIVE_OPTS -e "create table ${TARGET_DB}.context (id string, name string) row format delimited fields terminated by ','"
|
hive $HIVE_OPTS -e "create table ${TARGET_DB}.context_csv (id string, name string) row format delimited fields terminated by ','"
|
||||||
hive $HIVE_OPTS -e "create table ${TARGET_DB}.category (context string, id string, name string) row format delimited fields terminated by ','"
|
hive $HIVE_OPTS -e "load data inpath '${TMP}/contexts.csv' into table ${TARGET_DB}.context_csv"
|
||||||
hive $HIVE_OPTS -e "create table ${TARGET_DB}.concept (category string, id string, name string) row format delimited fields terminated by ','"
|
hive $HIVE_OPTS -e "create table ${TARGET_DB}.context stored as parquet as select * from ${TARGET_DB}.context_csv"
|
||||||
hive $HIVE_OPTS -e "load data inpath '${TMP}/contexts.csv' into table ${TARGET_DB}.context"
|
hive $HIVE_OPTS -e "drop table ${TARGET_DB}.context_csv purge"
|
||||||
hive $HIVE_OPTS -e "load data inpath '${TMP}/categories.csv' into table ${TARGET_DB}.category"
|
|
||||||
hive $HIVE_OPTS -e "load data inpath '${TMP}/concepts.csv' into table ${TARGET_DB}.concept"
|
hive $HIVE_OPTS -e "create table ${TARGET_DB}.category_csv (context string, id string, name string) row format delimited fields terminated by ','"
|
||||||
|
hive $HIVE_OPTS -e "load data inpath '${TMP}/categories.csv' into table ${TARGET_DB}.category_csv"
|
||||||
|
hive $HIVE_OPTS -e "create table ${TARGET_DB}.category stored as parquet as select * from ${TARGET_DB}.category_csv"
|
||||||
|
hive $HIVE_OPTS -e "drop table ${TARGET_DB}.category_csv purge"
|
||||||
|
|
||||||
|
hive $HIVE_OPTS -e "create table ${TARGET_DB}.concept_csv (category string, id string, name string) row format delimited fields terminated by ','"
|
||||||
|
hive $HIVE_OPTS -e "load data inpath '${TMP}/concepts.csv' into table ${TARGET_DB}.concept_csv"
|
||||||
|
hive $HIVE_OPTS -e "create table ${TARGET_DB}.concept stored as parquet as select * from ${TARGET_DB}.concept_csv"
|
||||||
|
hive $HIVE_OPTS -e "drop table ${TARGET_DB}.concept_csv purge"
|
||||||
|
|
||||||
echo "Cleaning up"
|
echo "Cleaning up"
|
||||||
rm concepts.csv
|
rm concepts.csv
|
||||||
|
|
|
@ -7,15 +7,28 @@ then
|
||||||
fi
|
fi
|
||||||
|
|
||||||
IMPALA_HDFS_NODE=''
|
IMPALA_HDFS_NODE=''
|
||||||
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu >/dev/null 2>&1; then
|
COUNTER=0
|
||||||
|
|
||||||
|
while [ $COUNTER -lt 3 ]; do
|
||||||
|
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
|
||||||
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
||||||
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu >/dev/null 2>&1; then
|
break
|
||||||
|
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu/tmp >/dev/null 2>&1; then
|
||||||
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
|
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
|
||||||
else
|
break
|
||||||
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER!\n\n"
|
else
|
||||||
|
IMPALA_HDFS_NODE=''
|
||||||
|
sleep 1
|
||||||
|
fi
|
||||||
|
((COUNTER++))
|
||||||
|
done
|
||||||
|
|
||||||
|
if [ -z "$IMPALA_HDFS_NODE" ]; then
|
||||||
|
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! $COUNTER\n\n"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE}"
|
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries."
|
||||||
|
|
||||||
|
|
||||||
export HADOOP_USER_NAME=$6
|
export HADOOP_USER_NAME=$6
|
||||||
export PROD_USAGE_STATS_DB="openaire_prod_usage_stats"
|
export PROD_USAGE_STATS_DB="openaire_prod_usage_stats"
|
||||||
|
|
|
@ -85,12 +85,12 @@ hive $HIVE_OPTS --database ${2}_funded -e "show tables" | grep -v WARN | sed "s/
|
||||||
hive -f foo
|
hive -f foo
|
||||||
echo "Updated shadow monitor funded database"
|
echo "Updated shadow monitor funded database"
|
||||||
|
|
||||||
echo "Updating shadow monitor insitutions database"
|
echo "Updating shadow monitor institutions database"
|
||||||
hive -e "drop database if exists ${SHADOW}_institutions cascade"
|
hive -e "drop database if exists ${SHADOW}_institutions cascade"
|
||||||
hive -e "create database if not exists ${SHADOW}_institutions"
|
hive -e "create database if not exists ${SHADOW}_institutions"
|
||||||
hive $HIVE_OPTS --database ${2}_institutions -e "show tables" | grep -v WARN | sed "s/\(.*\)/create view ${SHADOW}_institutions.\1 as select * from ${2}_institutions.\1;/" > foo
|
hive $HIVE_OPTS --database ${2}_institutions -e "show tables" | grep -v WARN | sed "s/\(.*\)/create view ${SHADOW}_institutions.\1 as select * from ${2}_institutions.\1;/" > foo
|
||||||
hive -f foo
|
hive -f foo
|
||||||
echo "Shadow db monitor insitutions ready!"
|
echo "Shadow db monitor institutions ready!"
|
||||||
|
|
||||||
echo "Updating shadow monitor RIs database"
|
echo "Updating shadow monitor RIs database"
|
||||||
for i in $contexts
|
for i in $contexts
|
||||||
|
|
|
@ -81,7 +81,11 @@ create table TARGET.result stored as parquet as
|
||||||
'openorgs____::8839b55dae0c84d56fd533f52d5d483a', -- Leibniz Institute of Ecological Urban and Regional Development
|
'openorgs____::8839b55dae0c84d56fd533f52d5d483a', -- Leibniz Institute of Ecological Urban and Regional Development
|
||||||
'openorgs____::526468206bca24c1c90da6a312295cf4', -- Cyprus University of Technology
|
'openorgs____::526468206bca24c1c90da6a312295cf4', -- Cyprus University of Technology
|
||||||
'openorgs____::b5ca9d4340e26454e367e2908ef3872f', -- Alma Mater Studiorum University of Bologna
|
'openorgs____::b5ca9d4340e26454e367e2908ef3872f', -- Alma Mater Studiorum University of Bologna
|
||||||
'openorgs____::a6340e6ecf60f6bba163659df985b0f2' -- TU Dresden
|
'openorgs____::a6340e6ecf60f6bba163659df985b0f2', -- TU Dresden
|
||||||
|
'openorgs____::64badd35233ba2cd4946368ef2f4cf57', -- University of Vienna
|
||||||
|
'openorgs____::7501d66d2297a963ebfb075c43fff88e', -- Royal Institute of Technology
|
||||||
|
'openorgs____::d5eb679abdd31f70fcd4c8ba711148bf', -- Sorbonne University
|
||||||
|
'openorgs____::b316f25380d106aac402f5ae8653910d' -- Centre for Research on Ecology and Forestry Applications
|
||||||
) )) foo;
|
) )) foo;
|
||||||
|
|
||||||
create view if not exists TARGET.category as select * from SOURCE.category;
|
create view if not exists TARGET.category as select * from SOURCE.category;
|
||||||
|
|
|
@ -63,5 +63,7 @@ create table TARGET.result stored as parquet as
|
||||||
'openorgs____::b5ca9d4340e26454e367e2908ef3872f', -- Alma Mater Studiorum University of Bologna
|
'openorgs____::b5ca9d4340e26454e367e2908ef3872f', -- Alma Mater Studiorum University of Bologna
|
||||||
'openorgs____::a6340e6ecf60f6bba163659df985b0f2', -- TU Dresden
|
'openorgs____::a6340e6ecf60f6bba163659df985b0f2', -- TU Dresden
|
||||||
'openorgs____::64badd35233ba2cd4946368ef2f4cf57', -- University of Vienna
|
'openorgs____::64badd35233ba2cd4946368ef2f4cf57', -- University of Vienna
|
||||||
'openorgs____::7501d66d2297a963ebfb075c43fff88e' -- Royal Institute of Technology
|
'openorgs____::7501d66d2297a963ebfb075c43fff88e', -- Royal Institute of Technology
|
||||||
|
'openorgs____::d5eb679abdd31f70fcd4c8ba711148bf', -- Sorbonne University
|
||||||
|
'openorgs____::b316f25380d106aac402f5ae8653910d' -- Centre for Research on Ecology and Forestry Applications
|
||||||
))) foo;
|
))) foo;
|
Loading…
Reference in New Issue