Merge branch 'beta_to_master_may2024' of https://code-repo.d4science.org/D-Net/dnet-hadoop into beta_to_master_may2024

This commit is contained in:
Claudio Atzori 2024-05-26 15:45:41 +02:00
commit f99eaa0376
8 changed files with 59 additions and 23 deletions

View File

@ -12,6 +12,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.StructType;
@ -70,6 +71,9 @@ public class CreateActionSetFromWebEntries implements Serializable {
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String blackListInputPath = parser.get("blackListPath");
log.info("blackListInputPath: {}", blackListInputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
@ -77,19 +81,25 @@ public class CreateActionSetFromWebEntries implements Serializable {
isSparkSessionManaged,
spark -> {
createActionSet(spark, inputPath, outputPath);
createActionSet(spark, inputPath, outputPath, blackListInputPath);
});
}
public static void createActionSet(SparkSession spark, String inputPath,
String outputPath) {
String outputPath, String blackListInputPath) {
final Dataset<Row> dataset = readWebCrawl(spark, inputPath)
.filter("publication_year <= 2020 or country_code=='IE'")
.filter("country_code=='IE'")
.drop("publication_year");
dataset.flatMap((FlatMapFunction<Row, Relation>) row -> {
final Dataset<Row> blackList = readBlackList(spark, blackListInputPath);
dataset
.join(blackList, dataset.col("id").equalTo(blackList.col("OpenAlexId")), "left")
.filter((FilterFunction<Row>) r -> r.getAs("OpenAlexId") == null)
.drop("OpenAlexId")
.flatMap((FlatMapFunction<Row, Relation>) row -> {
List<Relation> ret = new ArrayList<>();
final String ror = ROR_PREFIX
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror")));
@ -136,6 +146,15 @@ public class CreateActionSetFromWebEntries implements Serializable {
}
private static Dataset<Row> readBlackList(SparkSession spark, String inputPath) {
return spark
.read()
.option("header", true)
.csv(inputPath)
.select("OpenAlexId");
}
private static List<Relation> createAffiliationRelationPairPMCID(String pmcid, String ror) {
if (pmcid == null)
return new ArrayList<>();

View File

@ -16,5 +16,10 @@
"paramLongName": "isSparkSessionManaged",
"paramDescription": "the hdfs name node",
"paramRequired": false
},{
"paramName": "bl",
"paramLongName": "blackListPath",
"paramDescription": "the working path",
"paramRequired": true
}
]

View File

@ -1,2 +1,3 @@
sourcePath=/user/miriam.baglioni/openalex-snapshot/data/works/
outputPath=/tmp/miriam/webcrawlComplete/
blackListPath=/user/miriam.baglioni/openalex-blackList

View File

@ -45,6 +45,7 @@
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--blackListPath</arg><arg>${blackListPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -75,7 +75,11 @@ public class CreateASTest {
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/")
"/eu/dnetlib/dhp/actionmanager/webcrawl/input/")
.getPath();
String blackListPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
.getPath();
CreateActionSetFromWebEntries
@ -86,7 +90,8 @@ public class CreateASTest {
"-sourcePath",
inputPath,
"-outputPath",
workingDir.toString() + "/actionSet1"
workingDir.toString() + "/actionSet1",
"-blackListPath", blackListPath
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -96,7 +101,7 @@ public class CreateASTest {
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload()));
Assertions.assertEquals(64, tmp.count());
Assertions.assertEquals(58, tmp.count());
}
@ -109,6 +114,10 @@ public class CreateASTest {
.getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/")
.getPath();
String blackListPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
.getPath();
CreateActionSetFromWebEntries
.main(
@ -118,7 +127,8 @@ public class CreateASTest {
"-sourcePath",
inputPath,
"-outputPath",
workingDir.toString() + "/actionSet1"
workingDir.toString() + "/actionSet1",
"-blackListPath", blackListPath
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -184,7 +194,7 @@ public class CreateASTest {
Assertions
.assertEquals(
5, tmp
2, tmp
.filter(
r -> r
.getSource()
@ -197,7 +207,7 @@ public class CreateASTest {
Assertions
.assertEquals(
5, tmp
2, tmp
.filter(
r -> r
.getTarget()
@ -210,7 +220,7 @@ public class CreateASTest {
Assertions
.assertEquals(
2, tmp
1, tmp
.filter(
r -> r
.getTarget()
@ -224,7 +234,7 @@ public class CreateASTest {
Assertions
.assertEquals(
2, tmp
1, tmp
.filter(
r -> r
.getTarget()
@ -238,7 +248,7 @@ public class CreateASTest {
Assertions
.assertEquals(
1, tmp
0, tmp
.filter(
r -> r
.getTarget()