diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs_B2SHARE.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs_B2SHARE.java new file mode 100644 index 000000000..9ec6fb72e --- /dev/null +++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs_B2SHARE.java @@ -0,0 +1,204 @@ + +package eu.dnetlib.oa.graph.usagerawdata.export; + +import java.io.*; +import java.net.URL; +import java.net.URLConnection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author D. Pierrakos + */ +public class PiwikDownloadLogs_B2SHARE { + + private final String piwikUrl; + private Date startDate; + private final String tokenAuth; + + /* + * The Piwik's API method + */ + private final String APImethod = "?module=API&method=Live.getLastVisitsDetails"; + private final String format = "&format=json"; + + private static final Logger logger = LoggerFactory.getLogger(PiwikDownloadLogs_B2SHARE.class); + + public PiwikDownloadLogs_B2SHARE(String piwikUrl, String tokenAuth) { + this.piwikUrl = piwikUrl; + this.tokenAuth = tokenAuth; + + } + + private String getPiwikLogUrl() { + return "https://" + piwikUrl + "/"; + } + + private String getJson(String url) throws Exception { + try { + logger.debug("Connecting to download the JSON: " + url); + URL website = new URL(url); + URLConnection connection = website.openConnection(); + + StringBuilder response; + try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + response = new StringBuilder(); + String inputLine; + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); + } + } + return response.toString(); + } catch (Exception e) { + logger.error("Failed to get URL: " + url + " Exception: " + e); + throw new Exception("Failed to get URL: " + url + " Exception: " + e.toString(), e); + } + } + + public void GetOpenAIREB2SHARELogs(String repoLogsPath) throws Exception { + + Statement statement = ConnectDB.getHiveConnection().createStatement(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + + List piwikIdToVisit = new ArrayList(); + piwikIdToVisit.add(ExecuteWorkflow.b2SSHAREID); + logger.info("B2SHARE piwikId for download: " + piwikIdToVisit); + + if (ExecuteWorkflow.numberOfPiwikIdsToDownload > 0 + && ExecuteWorkflow.numberOfPiwikIdsToDownload <= piwikIdToVisit.size()) { + logger.info("Trimming piwikIds list to the size of: " + ExecuteWorkflow.numberOfPiwikIdsToDownload); + piwikIdToVisit = piwikIdToVisit.subList(0, ExecuteWorkflow.numberOfPiwikIdsToDownload); + } + + logger.info("Downloading for the followins piwikIds: " + piwikIdToVisit); + + // ExecutorService executor = Executors.newFixedThreadPool(ExecuteWorkflow.numberOfDownloadThreads); + for (int siteId : piwikIdToVisit) { + // Setting the starting period + Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone(); + logger.info("Starting period for log download: " + sdf.format(start.getTime())); + + // Setting the ending period (last day of the month) + // Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone(); + Calendar end = Calendar.getInstance(); + end.add(Calendar.DAY_OF_MONTH, -1); + // end.add(Calendar.MONTH, +1); +// end.add(Calendar.DAY_OF_MONTH, -1); + logger.info("Ending period for log download: " + sdf.format(end.getTime())); + + logger.info("Now working on piwikId: " + siteId); + + PreparedStatement st = ConnectDB.DB_HIVE_CONNECTION + .prepareStatement( + "SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema() + + ".piwiklog WHERE source=?"); + st.setInt(1, siteId); + Date dateMax = null; + ResultSet rs_date = st.executeQuery(); + while (rs_date.next()) { + logger.info("Found max date: " + rs_date.getString(1) + " for repository " + siteId); + + if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null") + && !rs_date.getString(1).equals("")) { + start.setTime(sdf.parse(rs_date.getString(1))); + dateMax = sdf.parse(rs_date.getString(1)); + } + } + rs_date.close(); + + for (Calendar currDay = (Calendar) start.clone(); currDay.before(end); currDay.add(Calendar.DATE, 1)) { + // logger.info("Date used " + currDay.toString()); + // Runnable worker = new WorkerThread(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID); + // executor.execute(worker);// calling execute method of ExecutorService + logger.info("Date used " + currDay.getTime().toString()); + + if (dateMax != null && currDay.getTime().compareTo(dateMax) <= 0) { + logger.info("Date found in logs " + dateMax + " and not downloanding Matomo logs for " + siteId); + } else { + GetOpenAIRELogsB2SHAREForDate(currDay, siteId, repoLogsPath); + } + + } + } + // executor.shutdown(); + // while (!executor.isTerminated()) { + // } + // System.out.println("Finished all threads"); + } + + public void GetOpenAIRELogsB2SHAREForDate(Calendar currDay, int siteId, String repoLogsPath) throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + + Date date = currDay.getTime(); + logger.info("Downloading logs for repoid " + siteId + " and for " + sdf.format(date)); + + String period = "&period=day&date=" + sdf.format(date); + String outFolder = repoLogsPath; + + String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format + + "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth; + String content = ""; + + int i = 0; + + JSONParser parser = new JSONParser(); + StringBuffer totalContent = new StringBuffer(); + FileSystem fs = FileSystem.get(new Configuration()); + + do { + int writtenBytes = 0; + String apiUrl = baseApiUrl; + + if (i > 0) { + apiUrl += "&filter_offset=" + (i * 1000); + } + + content = getJson(apiUrl); + if (content.length() == 0 || content.equals("[]")) { + break; + } + + FSDataOutputStream fin = fs + .create( + new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i + + ".json"), + true); + JSONArray jsonArray = (JSONArray) parser.parse(content); + for (Object aJsonArray : jsonArray) { + JSONObject jsonObjectRaw = (JSONObject) aJsonArray; + byte[] jsonObjectRawBytes = jsonObjectRaw.toJSONString().getBytes(); + fin.write(jsonObjectRawBytes); + fin.writeChar('\n'); + + writtenBytes += jsonObjectRawBytes.length + 1; + } + + fin.close(); + System.out + .println( + Thread.currentThread().getName() + " (Finished writing) Wrote " + writtenBytes + + " bytes. Filename: " + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i + + ".json"); + + i++; + } while (true); + + fs.close(); + } +} diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB_B2SHARE.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB_B2SHARE.java new file mode 100644 index 000000000..886079a23 --- /dev/null +++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB_B2SHARE.java @@ -0,0 +1,304 @@ + +package eu.dnetlib.oa.graph.usagerawdata.export; + +import java.io.*; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author D. Pierrakos, S. Zoupanos + */ +public class PiwikStatsDB_B2SHARE { + + private String logPath; + private String logRepoPath; + private String logPortalPath; + + private Statement stmt = null; + + private static final Logger logger = LoggerFactory.getLogger(PiwikStatsDB_B2SHARE.class); + + private String CounterRobotsURL; + private ArrayList robotsList; + + public PiwikStatsDB_B2SHARE(String logRepoPath, String logPortalPath) throws Exception { + this.logRepoPath = logRepoPath; + this.logPortalPath = logPortalPath; + + } + + public ArrayList getRobotsList() { + return robotsList; + } + + public void setRobotsList(ArrayList robotsList) { + this.robotsList = robotsList; + } + + public String getCounterRobotsURL() { + return CounterRobotsURL; + } + + public void setCounterRobotsURL(String CounterRobotsURL) { + this.CounterRobotsURL = CounterRobotsURL; + } + + public void processB2SHARELogs() throws Exception { + try { + + logger.info("Processing B2SHARE logs"); + processLog(); + logger.info("B2SHARE logs process done"); + + logger.info("Removing double clicks from B2SHARE logs"); + removeDoubleClicks(); + logger.info("Removing double clicks from B2SHARE logs done"); + + logger.info("Updating Production Tables"); + updateProdTables(); + logger.info("Updated Production Tables"); + + } catch (Exception e) { + logger.error("Failed to process logs: " + e); + throw new Exception("Failed to process logs: " + e.toString(), e); + } + } + + public void processLog() throws Exception { + + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Adding JSON Serde jar"); + stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar"); + logger.info("Added JSON Serde jar"); + + logger.info("Dropping piwiklog_b2share_tmp_json table"); + String drop_piwiklogtmp_json = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".piwiklog_b2share_tmp_json"; + stmt.executeUpdate(drop_piwiklogtmp_json); + logger.info("Dropped piwiklog_b2share_tmp_json table"); + + logger.info("Creating piwiklog_b2share_tmp_json"); + String create_piwiklogtmp_json = "CREATE EXTERNAL TABLE IF NOT EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".piwiklog_b2share_tmp_json(\n" + + " `idSite` STRING,\n" + + " `idVisit` STRING,\n" + + " `country` STRING,\n" + + " `referrerName` STRING,\n" + + " `browser` STRING,\n" + + " `actionDetails` ARRAY<\n" + + " struct<\n" + + " type: STRING,\n" + + " url: STRING,\n" + + " eventAction: STRING,\n" + + " eventName: STRING,\n" + + " timestamp: String\n" + + " >\n" + + " >\n" + + ")\n" + + "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" + + "LOCATION '" + ExecuteWorkflow.repoLogPath + "'\n" + + "TBLPROPERTIES (\"transactional\"=\"false\")"; + stmt.executeUpdate(create_piwiklogtmp_json); + logger.info("Created piwiklog_b2share_tmp_json"); + + logger.info("Dropping piwiklogtmp table"); + String drop_piwiklogtmp = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".piwiklogtmp"; + stmt.executeUpdate(drop_piwiklogtmp); + logger.info("Dropped piwiklogtmp"); + + logger.info("Creating piwiklogb2sharetmp"); + String create_piwiklogtmp = "CREATE TABLE " + + ConnectDB.getUsageStatsDBSchema() + + ".piwiklogb2sharetmp (source BIGINT, id_Visit STRING, country STRING, action STRING, url STRING, " + + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + + "clustered by (source) into 100 buckets stored as orc tblproperties('transactional'='true')"; + stmt.executeUpdate(create_piwiklogtmp); + logger.info("Created piwiklogb2sharetmp"); + + logger.info("Inserting into piwiklogb2sharetmp"); + String insert_piwiklogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp " + + "SELECT DISTINCT cast(idSite as BIGINT) as source, idVisit as id_Visit, country, " + + "actiondetail.eventAction as action, actiondetail.url as url, " + + "actiondetail.eventName as entity_id, " + + "'repItem' as source_item_type, from_unixtime(cast(actiondetail.timestamp as BIGINT)) as timestamp, " + + "referrerName as referrer_name, browser as agent\n" + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog_b2share_tmp_json\n" + + "LATERAL VIEW explode(actiondetails) actiondetailsTable AS actiondetail"; + stmt.executeUpdate(insert_piwiklogtmp); + logger.info("Inserted into piwiklogb2sharetmp"); + + stmt.close(); + } + + public void removeDoubleClicks() throws Exception { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Cleaning download double clicks"); + // clean download double clicks + String sql = "DELETE from " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp " + + "WHERE EXISTS (\n" + + "SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp \n" + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp p1, " + + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp p2\n" + + "WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id \n" + + "AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp \n" + + "AND p1.timestamp listHdfsDir(String dir) throws Exception { + + FileSystem hdfs = FileSystem.get(new Configuration()); + RemoteIterator Files; + ArrayList fileNames = new ArrayList<>(); + + try { + Path exportPath = new Path(hdfs.getUri() + dir); + Files = hdfs.listFiles(exportPath, false); + while (Files.hasNext()) { + String fileName = Files.next().getPath().toString(); + fileNames.add(fileName); + } + + hdfs.close(); + } catch (Exception e) { + logger.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logPath)); + throw new Exception("HDFS file path with exported data does not exist : " + logPath, e); + } + + return fileNames; + } + + private String readHDFSFile(String filename) throws Exception { + String result; + try { + + FileSystem fs = FileSystem.get(new Configuration()); + // log.info("reading file : " + filename); + + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename)))); + + StringBuilder sb = new StringBuilder(); + String line = br.readLine(); + + while (line != null) { + if (!line.equals("[]")) { + sb.append(line); + } + // sb.append(line); + line = br.readLine(); + } + result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\""); + if (result.equals("")) { + result = "[]"; + } + + // fs.close(); + } catch (Exception e) { + logger.error(e.getMessage()); + throw new Exception(e); + } + + return result; + } + + private Connection getConnection() throws SQLException { + return ConnectDB.getHiveConnection(); + } + + public void createPedocsOldUsageData() throws SQLException { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Creating PeDocs Old Views Table"); + String sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".pedocsoldviews as select * from default.pedocsviews"; + stmt.executeUpdate(sql); + logger.info("PeDocs Old Views Table created"); + + logger.info("Creating PeDocs Old Downloads Table"); + sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".pedocsolddownloads as select * from default.pedocsdownloads"; + stmt.executeUpdate(sql); + logger.info("PeDocs Old Downloads Table created"); + + } + + public void createDatasetsUsageData() throws SQLException { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Creating Datasets Views Table"); + String sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".datacite_views as select * from datasetsusagestats_20210301.datacite_views"; + stmt.executeUpdate(sql); + logger.info("Datasets Views Table created"); + + logger.info("Creating Datasets Downloads Table"); + sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".datacite_downloads as select * from datasetsusagestats_20210301.datacite_downloads"; + stmt.executeUpdate(sql); + logger.info("Datasets Downloads Table created"); + + } +}