Compare commits

...

27 Commits

Author SHA1 Message Date
Miriam Baglioni c8a88b2187 [DataciteHostedByMap] added entry for EBRAINS 2024-04-04 09:14:58 +02:00
Claudio Atzori 26b97aa5ed Merge pull request '[BETA] fixed the result_country definition and updated the stats DB copy procedure' (#416) from antonis.lempesis/dnet-hadoop:beta into beta
Reviewed-on: #416
2024-04-03 12:36:03 +02:00
Lampros Smyrnaios b7c8acc563 - Update the code which acquires the "IMPALA_HDFS_NODE", to test the "tmp"-dir, instead of the base-dir and introduce retries, to overcome potential file-system failures. This change was suggested by "Sebastian Tymkow" and "Grzegorz Bakalarski".
- Fix typos.
2024-04-03 13:15:37 +03:00
Michele Artini 71d6e02886 Merge branch 'beta' of code-repo.d4science.org:D-Net/dnet-hadoop into beta 2024-04-03 09:50:41 +02:00
Michele Artini 02c9a311c8 base datainfo with trust=0.89 2024-04-03 09:50:21 +02:00
Miriam Baglioni 42846d3b91 [OpenCitation] add compression option when writing the sequence file 2024-04-03 09:25:00 +02:00
Miriam Baglioni 4f0a044245 Merge pull request 'Add action set creation for Datacite affiliations' (#413) from 9647_datacite_affiliations into beta
Reviewed-on: #413
2024-04-02 17:33:38 +02:00
Miriam Baglioni 4bb504e693 Merge pull request '[UsageCount] fixed error' (#415) from UsageStatsRecordDS into beta
Reviewed-on: #415
2024-04-02 17:06:12 +02:00
Serafeim Chatzopoulos cbe13a5c61 Fix datacite input path in properties file 2024-04-02 18:00:35 +03:00
Miriam Baglioni 9c9a9562ae [UsageCount] fixed error 2024-04-02 16:56:37 +02:00
Miriam Baglioni 2c4440951f Merge pull request '[UsageCount] add check in case the datasource is not matched against those present in the graph' (#414) from UsageStatsRecordDS into beta
Reviewed-on: #414
2024-04-02 16:30:39 +02:00
Miriam Baglioni b42bdd5fb3 [UsageCount] add check in case the datasource is not matched against those present in the graph 2024-04-02 16:28:27 +02:00
Miriam Baglioni 64cbd8abe9 Merge pull request '[UsageCount] Usage count per result split by datasource' (#318) from UsageStatsRecordDS into beta
Reviewed-on: #318
2024-04-02 10:21:39 +02:00
Antonis Lempesis df6e3bda04 added new orgs in monitor 2024-04-01 22:45:29 +03:00
Antonis Lempesis 573b081f1d added new orgs in monitor 2024-04-01 22:24:46 +03:00
Serafeim Chatzopoulos 0eb0701b26 Add action set creation for Datacite affiliations 2024-04-01 17:23:26 +03:00
Antonis Lempesis 0bf2a7a359 fixed the result_country definition 2024-04-01 15:23:22 +03:00
Claudio Atzori 24227ab598 Merge pull request '[BETA] fixed typo in indicator query' (#411) from antonis.lempesis/dnet-hadoop:beta into beta
Reviewed-on: #411
2024-03-27 13:56:43 +01:00
Antonis Lempesis 9ff44eed96 fixed typo in indicator query
added more institutions
2024-03-27 14:39:01 +02:00
Claudio Atzori cff6040424 Merge pull request '[BETA] added missing EOS, Generate tables with parquet-files, instead of csv in the contexts.sh script' (#409) from antonis.lempesis/dnet-hadoop:beta into beta
Reviewed-on: #409
2024-03-27 12:04:04 +01:00
Antonis Lempesis 1fee4124e0 added missing EOS 2024-03-27 12:58:25 +02:00
Claudio Atzori 9e700a8b0d Merge pull request 'adding context information to projects and datasources' (#407) from taggingProjects into beta
Reviewed-on: #407
2024-03-26 14:53:38 +01:00
Lampros Smyrnaios 036ba03fcd Generate tables with parquet-files, instead of csv, in "dhp-stats-update/.../contexts.sh" script. 2024-03-26 13:29:04 +02:00
Miriam Baglioni a418dacb47 [UsageCount] code extention to include also the name of the datasource 2024-01-29 18:12:33 +01:00
Miriam Baglioni e9131f4e4a mergin with branch beta 2024-01-29 16:27:18 +01:00
Miriam Baglioni 4c9bc4c3a5 refactoring 2023-06-30 19:05:15 +02:00
Miriam Baglioni 55ea485783 [UsageCount] split the count for result at the level of the datasource. for each indicator one unit is specified for each datasource contrinuting to that indicator value. The datasource key is the value of the key element in the unit for the measure, while the count for that datasource is in the value 2023-06-30 18:39:30 +02:00
29 changed files with 723 additions and 59 deletions

View File

@ -67,6 +67,9 @@ public class PrepareAffiliationRelations implements Serializable {
final String openapcInputPath = parser.get("openapcInputPath");
log.info("openapcInputPath: {}", openapcInputPath);
final String dataciteInputPath = parser.get("dataciteInputPath");
log.info("dataciteInputPath: {}", dataciteInputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
@ -93,9 +96,15 @@ public class PrepareAffiliationRelations implements Serializable {
JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelations(
spark, openapcInputPath, collectedFromOpenAPC);
List<KeyValue> collectedFromDatacite = OafMapperUtils
.listKeyValues(ModelConstants.DATACITE_ID, "Datacite");
JavaPairRDD<Text, Text> dataciteRelations = prepareAffiliationRelations(
spark, dataciteInputPath, collectedFromDatacite);
crossrefRelations
.union(pubmedRelations)
.union(openAPCRelations)
.union(dataciteRelations)
.saveAsHadoopFile(
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);

View File

@ -88,7 +88,7 @@ public class CreateActionSetSparkJob implements Serializable {
private static void extractContent(SparkSession spark, String inputPath, String outputPath) {
getTextTextJavaPairRDD(spark, inputPath)
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);// , GzipCodec.class);
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
}
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(SparkSession spark, String inputPath) {

View File

@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.actionmanager.Constants.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@ -13,7 +14,9 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
@ -68,18 +71,59 @@ public class SparkAtomicActionUsageJob implements Serializable {
final String workingPath = parser.get("workingPath");
final String datasourcePath = parser.get("datasourcePath");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id");
prepareResultData(
dbname, spark, workingPath + "/usageDb",
"usage_stats",
"result_id",
"repository_id",
datasourcePath);
prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id");
prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repository_id");
writeActionSet(spark, workingPath, outputPath);
});
}
private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName,
String resultAttributeName, String datasourceAttributeName,
String datasourcePath) {
Dataset<UsageStatsResultModel> resultModel = spark
.sql(
String
.format(
"select %s as id, %s as datasourceId, sum(downloads) as downloads, sum(views) as views " +
"from %s.%s group by %s, %s",
resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName,
datasourceAttributeName))
.as(Encoders.bean(UsageStatsResultModel.class));
Dataset<Datasource> datasource = readPath(spark, datasourcePath, Datasource.class)
.filter((FilterFunction<Datasource>) d -> !d.getDataInfo().getDeletedbyinference())
.map((MapFunction<Datasource, Datasource>) d -> {
d.setId(d.getId().substring(3));
return d;
}, Encoders.bean(Datasource.class));
resultModel
.joinWith(datasource, resultModel.col("datasourceId").equalTo(datasource.col("id")), "left")
.map((MapFunction<Tuple2<UsageStatsResultModel, Datasource>, UsageStatsResultModel>) t2 -> {
UsageStatsResultModel usrm = t2._1();
if(Optional.ofNullable(t2._2()).isPresent())
usrm.setDatasourceId(usrm.getDatasourceId() + "||" + t2._2().getOfficialname().getValue());
else
usrm.setDatasourceId(usrm.getDatasourceId() + "||NO_MATCH_FOUND");
return usrm;
}, Encoders.bean(UsageStatsResultModel.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath);
}
private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName,
String attribute_name) {
spark
@ -115,15 +159,62 @@ public class SparkAtomicActionUsageJob implements Serializable {
}
public static Measure newMeasureInstance(String id) {
Measure m = new Measure();
m.setId(id);
m.setUnit(new ArrayList<>());
return m;
}
private static Dataset<Result> getFinalIndicatorsResult(SparkSession spark, String inputPath) {
return readPath(spark, inputPath, UsageStatsModel.class)
.map((MapFunction<UsageStatsModel, Result>) usm -> {
return readPath(spark, inputPath, UsageStatsResultModel.class)
.groupByKey((MapFunction<UsageStatsResultModel, String>) usm -> usm.getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, UsageStatsResultModel, Result>) (k, it) -> {
Result r = new Result();
r.setId("50|" + usm.getId());
r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews()));
r.setId("50|" + k);
// id = download or view and unit = list of key value pairs
Measure download = newMeasureInstance("downloads");
Measure view = newMeasureInstance("views");
UsageStatsResultModel first = it.next();
addCountForDatasource(download, first, view);
it.forEachRemaining(usm -> {
addCountForDatasource(download, usm, view);
});
r.setMeasures(Arrays.asList(download, view));
return r;
}, Encoders.bean(Result.class));
}, Encoders.bean(Result.class))
// .map((MapFunction<UsageStatsResultModel, Result>) usm -> {
// Result r = new Result();
// r.setId("50|" + usm.getId());
// r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews()));
// return r;
// }, Encoders.bean(Result.class));
;
}
private static void addCountForDatasource(Measure download, UsageStatsResultModel usm, Measure view) {
DataInfo dataInfo = OafMapperUtils
.dataInfo(
false,
UPDATE_DATA_INFO_TYPE,
true,
false,
OafMapperUtils
.qualifier(
UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID,
UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"");
download
.getUnit()
.add(
OafMapperUtils
.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getDownloads()), dataInfo));
view
.getUnit()
.add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getViews()), dataInfo));
}
private static Dataset<Project> getFinalIndicatorsProject(SparkSession spark, String inputPath) {

View File

@ -0,0 +1,18 @@
package eu.dnetlib.dhp.actionmanager.usagestats;
/**
* @author miriam.baglioni
* @Date 30/06/23
*/
public class UsageStatsResultModel extends UsageStatsModel {
private String datasourceId;
public String getDatasourceId() {
return datasourceId;
}
public void setDatasourceId(String datasourceId) {
this.datasourceId = datasourceId;
}
}

View File

@ -23,6 +23,12 @@
"paramDescription": "the path to get the input data from OpenAPC",
"paramRequired": true
},
{
"paramName": "dip",
"paramLongName": "dataciteInputPath",
"paramDescription": "the path to get the input data from Datacite",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",

View File

@ -34,4 +34,6 @@ oozie.wf.application.path=${oozieTopWfApplicationPath}
crossrefInputPath=/data/bip-affiliations/crossref-data.json
pubmedInputPath=/data/bip-affiliations/pubmed-data.json
openapcInputPath=/data/bip-affiliations/openapc-data.json
dataciteInputPath=/data/bip-affiliations/datacite-data.json
outputPath=/tmp/crossref-affiliations-output-v5

View File

@ -13,6 +13,10 @@
<name>openapcInputPath</name>
<description>the path where to find the inferred affiliation relations from OpenAPC</description>
</property>
<property>
<name>dataciteInputPath</name>
<description>the path where to find the inferred affiliation relations from Datacite</description>
</property>
<property>
<name>outputPath</name>
<description>the path where to store the actionset</description>
@ -107,6 +111,8 @@
<arg>--crossrefInputPath</arg><arg>${crossrefInputPath}</arg>
<arg>--pubmedInputPath</arg><arg>${pubmedInputPath}</arg>
<arg>--openapcInputPath</arg><arg>${openapcInputPath}</arg>
<arg>--dataciteInputPath</arg><arg>${dataciteInputPath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="End"/>

View File

@ -28,5 +28,11 @@
"paramLongName": "workingPath",
"paramDescription": "the workingPath where to save the content of the usage_stats table",
"paramRequired": true
},
{
"paramName": "dp",
"paramLongName": "datasourcePath",
"paramDescription": "the workingPath where to save the content of the usage_stats table",
"paramRequired": true
}
]

View File

@ -90,6 +90,7 @@
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--usagestatsdb</arg><arg>${usagestatsdb}</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg>
<arg>--datasourcePath</arg><arg>${datasourcePath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -390,6 +390,18 @@ base_dc:link (I used dc:identifier)
</xsl:choose>
</oaf:relation>
</xsl:for-each>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.89</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction classid="sysimport:crosswalk:aggregator"
classname="sysimport:crosswalk:aggregator"
schemeid="dnet:provenanceActions"
schemename="dnet:provenanceActions"/>
</oaf:datainfo>
</metadata>
<xsl:copy-of select="//*[local-name() = 'about']" />
</record>

View File

@ -429,6 +429,17 @@
</xsl:choose>
</oaf:relation>
</xsl:for-each>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.89</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction classid="sysimport:crosswalk:aggregator"
classname="sysimport:crosswalk:aggregator"
schemeid="dnet:provenanceActions"
schemename="dnet:provenanceActions"/>
</oaf:datainfo>
</metadata>
<xsl:copy-of select="//*[local-name() = 'about']" />
</record>

View File

@ -1048,5 +1048,11 @@
"openaire_id": "re3data_____::r3d100010399",
"datacite_name": "ZEW Forschungsdatenzentrum",
"official_name": "ZEW Forschungsdatenzentrum"
},
"HBP.NEUROINF": {
"openaire_id": "fairsharing_::2975",
"datacite_name": "EBRAINS",
"official_name": "EBRAINS"
}
}

View File

@ -87,6 +87,7 @@ public class PrepareAffiliationRelationsTest {
"-crossrefInputPath", crossrefAffiliationRelationPath,
"-pubmedInputPath", crossrefAffiliationRelationPath,
"-openapcInputPath", crossrefAffiliationRelationPath,
"-dataciteInputPath", crossrefAffiliationRelationPath,
"-outputPath", outputPath
});
@ -103,7 +104,7 @@ public class PrepareAffiliationRelationsTest {
// );
// }
// count the number of relations
assertEquals(60, tmp.count());
assertEquals(80, tmp.count());
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
dataset.createOrReplaceTempView("result");
@ -114,7 +115,7 @@ public class PrepareAffiliationRelationsTest {
// verify that we have equal number of bi-directional relations
Assertions
.assertEquals(
30, execVerification
40, execVerification
.filter(
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
.collectAsList()
@ -122,7 +123,7 @@ public class PrepareAffiliationRelationsTest {
Assertions
.assertEquals(
30, execVerification
40, execVerification
.filter(
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
.collectAsList()

View File

@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Measure;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
@ -66,10 +67,380 @@ public class SparkAtomicActionCountJobTest {
spark.stop();
}
@Test
void testUsageStatsDb2() {
String usageScoresPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test2")
.getPath();
SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<AtomicAction> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
.map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class));
// .map(aa -> (Result) aa.getPayload());
Assertions.assertEquals(7, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count());
Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count());
Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count());
tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity) r.getPayload()).getMeasures().size()));
tmp
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference()))));
tmp
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred()))));
tmp
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible()))));
tmp
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"measure:usage_counts",
u.getDataInfo().getProvenanceaction().getClassid()))));
tmp
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"Inferred by OpenAIRE",
u.getDataInfo().getProvenanceaction().getClassname()))));
tmp
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|"))
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"count",
u.getKey()))));
Assertions
.assertEquals(
1,
tmp
.filter(
r -> ((OafEntity) r.getPayload())
.getId()
.equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
.count());
OafEntity entity = (OafEntity) tmp
.filter(
aa -> ((OafEntity) aa.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
.first()
.getPayload();
entity
.getMeasures()
.stream()
.forEach(
m -> Assertions.assertEquals(3, m.getUnit().size()));
Measure downloads = entity
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.findFirst()
.get();
Assertions
.assertEquals(
String.valueOf(0),
downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue());
Assertions
.assertEquals(
String.valueOf(0),
downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue());
Assertions
.assertEquals(
String.valueOf(1),
downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue());
Measure views = entity
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.findFirst()
.get();
Assertions
.assertEquals(
String.valueOf(5),
views.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue());
Assertions
.assertEquals(
String.valueOf(1),
views.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue());
Assertions
.assertEquals(
String.valueOf(3),
views.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue());
tmp
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|"))
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"count",
u.getKey()))));
Assertions
.assertEquals(
"0",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"5",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"0",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"1",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"2",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"6",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"0",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"5",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"0",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"1",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"2",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"6",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
}
@Test
void testMatch() {
String usageScoresPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats")
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test1")
.getPath();
SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
@ -144,6 +515,39 @@ public class SparkAtomicActionCountJobTest {
u.getDataInfo().getProvenanceaction().getClassname()))));
tmp
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|"))
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"count",
u.getKey()))));
tmp
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|"))
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"10|fake1",
u.getKey()))));
tmp
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|"))
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
@ -465,5 +869,4 @@ public class SparkAtomicActionCountJobTest {
.get(0)
.getValue());
}
}

View File

@ -0,0 +1,9 @@
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake1","downloads":0,"views":5}
{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","datasourceId":"10|fake1","downloads":0,"views":1}
{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","datasourceId":"10|fake1","downloads":1,"views":3}
{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","datasourceId":"10|fake1","downloads":1,"views":1}
{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","datasourceId":"10|fake1","downloads":2,"views":6}
{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","datasourceId":"10|fake1","downloads":0,"views":1}
{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","datasourceId":"10|fake1","downloads":1,"views":3}
{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","datasourceId":"10|fake1","downloads":1,"views":10}
{"id":"doi_________::4938a71a884dd481d329657aa543b850","datasourceId":"10|fake1","downloads":0,"views":3}

View File

@ -0,0 +1,9 @@
{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5}
{"id":"d11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
{"id":"d11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
{"id":"d11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6}
{"id":"d12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
{"id":"d12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10}
{"id":"d13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}

View File

@ -0,0 +1,9 @@
{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5}
{"id":"f11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
{"id":"f11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
{"id":"f11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6}
{"id":"f12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
{"id":"f12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10}
{"id":"f13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}

View File

@ -0,0 +1,9 @@
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake1","downloads":0,"views":5}
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake2","downloads":0,"views":1}
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake3","downloads":1,"views":3}
{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","datasourceId":"10|fake1","downloads":1,"views":1}
{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","datasourceId":"10|fake1","downloads":2,"views":6}
{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","datasourceId":"10|fake1","downloads":0,"views":1}
{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","datasourceId":"10|fake1","downloads":1,"views":3}
{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","datasourceId":"10|fake1","downloads":1,"views":10}
{"id":"doi_________::4938a71a884dd481d329657aa543b850","datasourceId":"10|fake1","downloads":0,"views":3}

View File

@ -1,9 +0,0 @@
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5}
{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6}
{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10}
{"id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}

View File

@ -9,15 +9,27 @@ fi
export HADOOP_USER_NAME=$2
IMPALA_HDFS_NODE=''
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
else
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER!\n\n"
COUNTER=0
while [ $COUNTER -lt 3 ]; do
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
break
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu/tmp >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
break
else
IMPALA_HDFS_NODE=''
sleep 1
fi
((COUNTER++))
done
if [ -z "$IMPALA_HDFS_NODE" ]; then
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! $COUNTER\n\n"
exit 1
fi
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE}"
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries."
function copydb() {

View File

@ -9,15 +9,28 @@ fi
export HADOOP_USER_NAME=$2
IMPALA_HDFS_NODE=''
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
else
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER!\n\n"
COUNTER=0
while [ $COUNTER -lt 3 ]; do
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
break
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu/tmp >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
break
else
IMPALA_HDFS_NODE=''
sleep 1
fi
((COUNTER++))
done
if [ -z "$IMPALA_HDFS_NODE" ]; then
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! $COUNTER\n\n"
exit 1
fi
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE}"
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries."
function copydb() {

View File

@ -9,15 +9,28 @@ fi
#export HADOOP_USER_NAME=$2
IMPALA_HDFS_NODE=''
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
else
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER!\n\n"
COUNTER=0
while [ $COUNTER -lt 3 ]; do
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
break
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu/tmp >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
break
else
IMPALA_HDFS_NODE=''
sleep 1
fi
((COUNTER++))
done
if [ -z "$IMPALA_HDFS_NODE" ]; then
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! $COUNTER\n\n"
exit 1
fi
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE}"
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries."
function copydb() {

View File

@ -35,12 +35,20 @@ export HADOOP_USER="oozie"
export HADOOP_USER_NAME="oozie"
echo "Creating and populating impala tables"
hive $HIVE_OPTS -e "create table ${TARGET_DB}.context (id string, name string) row format delimited fields terminated by ','"
hive $HIVE_OPTS -e "create table ${TARGET_DB}.category (context string, id string, name string) row format delimited fields terminated by ','"
hive $HIVE_OPTS -e "create table ${TARGET_DB}.concept (category string, id string, name string) row format delimited fields terminated by ','"
hive $HIVE_OPTS -e "load data inpath '${TMP}/contexts.csv' into table ${TARGET_DB}.context"
hive $HIVE_OPTS -e "load data inpath '${TMP}/categories.csv' into table ${TARGET_DB}.category"
hive $HIVE_OPTS -e "load data inpath '${TMP}/concepts.csv' into table ${TARGET_DB}.concept"
hive $HIVE_OPTS -e "create table ${TARGET_DB}.context_csv (id string, name string) row format delimited fields terminated by ','"
hive $HIVE_OPTS -e "load data inpath '${TMP}/contexts.csv' into table ${TARGET_DB}.context_csv"
hive $HIVE_OPTS -e "create table ${TARGET_DB}.context stored as parquet as select * from ${TARGET_DB}.context_csv"
hive $HIVE_OPTS -e "drop table ${TARGET_DB}.context_csv purge"
hive $HIVE_OPTS -e "create table ${TARGET_DB}.category_csv (context string, id string, name string) row format delimited fields terminated by ','"
hive $HIVE_OPTS -e "load data inpath '${TMP}/categories.csv' into table ${TARGET_DB}.category_csv"
hive $HIVE_OPTS -e "create table ${TARGET_DB}.category stored as parquet as select * from ${TARGET_DB}.category_csv"
hive $HIVE_OPTS -e "drop table ${TARGET_DB}.category_csv purge"
hive $HIVE_OPTS -e "create table ${TARGET_DB}.concept_csv (category string, id string, name string) row format delimited fields terminated by ','"
hive $HIVE_OPTS -e "load data inpath '${TMP}/concepts.csv' into table ${TARGET_DB}.concept_csv"
hive $HIVE_OPTS -e "create table ${TARGET_DB}.concept stored as parquet as select * from ${TARGET_DB}.concept_csv"
hive $HIVE_OPTS -e "drop table ${TARGET_DB}.concept_csv purge"
echo "Cleaning up"
rm concepts.csv

View File

@ -7,15 +7,28 @@ then
fi
IMPALA_HDFS_NODE=''
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
else
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER!\n\n"
COUNTER=0
while [ $COUNTER -lt 3 ]; do
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
break
elif hdfs dfs -test -e hdfs://impala-cluster-mn2.openaire.eu/tmp >/dev/null 2>&1; then
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn2.openaire.eu:8020'
break
else
IMPALA_HDFS_NODE=''
sleep 1
fi
((COUNTER++))
done
if [ -z "$IMPALA_HDFS_NODE" ]; then
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! $COUNTER\n\n"
exit 1
fi
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE}"
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries."
export HADOOP_USER_NAME=$6
export PROD_USAGE_STATS_DB="openaire_prod_usage_stats"

View File

@ -85,12 +85,12 @@ hive $HIVE_OPTS --database ${2}_funded -e "show tables" | grep -v WARN | sed "s/
hive -f foo
echo "Updated shadow monitor funded database"
echo "Updating shadow monitor insitutions database"
echo "Updating shadow monitor institutions database"
hive -e "drop database if exists ${SHADOW}_institutions cascade"
hive -e "create database if not exists ${SHADOW}_institutions"
hive $HIVE_OPTS --database ${2}_institutions -e "show tables" | grep -v WARN | sed "s/\(.*\)/create view ${SHADOW}_institutions.\1 as select * from ${2}_institutions.\1;/" > foo
hive -f foo
echo "Shadow db monitor insitutions ready!"
echo "Shadow db monitor institutions ready!"
echo "Updating shadow monitor RIs database"
for i in $contexts

View File

@ -81,7 +81,11 @@ create table TARGET.result stored as parquet as
'openorgs____::8839b55dae0c84d56fd533f52d5d483a', -- Leibniz Institute of Ecological Urban and Regional Development
'openorgs____::526468206bca24c1c90da6a312295cf4', -- Cyprus University of Technology
'openorgs____::b5ca9d4340e26454e367e2908ef3872f', -- Alma Mater Studiorum University of Bologna
'openorgs____::a6340e6ecf60f6bba163659df985b0f2' -- TU Dresden
'openorgs____::a6340e6ecf60f6bba163659df985b0f2', -- TU Dresden
'openorgs____::64badd35233ba2cd4946368ef2f4cf57', -- University of Vienna
'openorgs____::7501d66d2297a963ebfb075c43fff88e', -- Royal Institute of Technology
'openorgs____::d5eb679abdd31f70fcd4c8ba711148bf', -- Sorbonne University
'openorgs____::b316f25380d106aac402f5ae8653910d' -- Centre for Research on Ecology and Forestry Applications
) )) foo;
create view if not exists TARGET.category as select * from SOURCE.category;

View File

@ -63,5 +63,7 @@ create table TARGET.result stored as parquet as
'openorgs____::b5ca9d4340e26454e367e2908ef3872f', -- Alma Mater Studiorum University of Bologna
'openorgs____::a6340e6ecf60f6bba163659df985b0f2', -- TU Dresden
'openorgs____::64badd35233ba2cd4946368ef2f4cf57', -- University of Vienna
'openorgs____::7501d66d2297a963ebfb075c43fff88e' -- Royal Institute of Technology
'openorgs____::7501d66d2297a963ebfb075c43fff88e', -- Royal Institute of Technology
'openorgs____::d5eb679abdd31f70fcd4c8ba711148bf', -- Sorbonne University
'openorgs____::b316f25380d106aac402f5ae8653910d' -- Centre for Research on Ecology and Forestry Applications
))) foo;