Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SFR-2476: Implement DSpace service and update DOAB ingest #531

Merged
merged 7 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 46 additions & 129 deletions processes/ingest/doab.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
from datetime import datetime, timedelta, timezone
from io import BytesIO
from lxml import etree
import os
import requests

from constants.get_constants import get_constants
from services import DSpaceService
from ..core import CoreProcess
from logger import create_log
from mappings.doab import DOABMapping
from mappings.base_mapping import MappingError
from managers import DOABLinkManager, RabbitMQManager, S3Manager
from managers import DOABLinkManager, S3Manager, RabbitMQManager
from model import get_file_message


logger = create_log(__name__)


class DOABProcess(CoreProcess):
DOAB_BASE_URL = 'https://directory.doabooks.org/oai/request?'
ROOT_NAMESPACE = {None: 'http://www.openarchives.org/OAI/2.0/'}
DOAB_IDENTIFIER = 'oai:directory.doabooks.org'

OAI_NAMESPACES = {
'oai_dc': 'http://www.openarchives.org/OAI/2.0/oai_dc/',
Expand All @@ -30,145 +25,67 @@ class DOABProcess(CoreProcess):
def __init__(self, *args):
super(DOABProcess, self).__init__(*args[:4])

self.ingestOffset = int(args[5]) if args[5] else 0
self.ingestLimit = (int(args[4]) + self.ingestOffset) if args[4] else 10000

self.generateEngine()
self.createSession()

self.s3_manager = S3Manager()
self.s3_manager.createS3Client()
self.s3Bucket = os.environ['FILE_BUCKET']
self.s3_bucket = os.environ['FILE_BUCKET']

self.fileQueue = os.environ['FILE_QUEUE']
self.fileRoute = os.environ['FILE_ROUTING_KEY']
self.file_queue = os.environ['FILE_QUEUE']
self.file_route = os.environ['FILE_ROUTING_KEY']

self.rabbitmq_manager = RabbitMQManager()
self.rabbitmq_manager.createRabbitConnection()
self.rabbitmq_manager.createOrConnectQueue(self.fileQueue, self.fileRoute)
self.rabbitmq_manager.createOrConnectQueue(
self.file_queue, self.file_route)

self.offset = int(args[5]) if args[5] else 0
self.limit = (int(args[4]) + self.offset) if args[4] else 10000

self.constants = get_constants()
self.dspace_service = DSpaceService(base_url=self.DOAB_BASE_URL, source_mapping=DOABMapping)

def runProcess(self):
if self.process == 'daily':
self.importOAIRecords()
elif self.process == 'complete':
self.importOAIRecords(fullOrPartial=True)
elif self.process == 'custom':
self.importOAIRecords(startTimestamp=self.ingestPeriod)
elif self.process == 'single':
self.importSingleOAIRecord(self.singleRecord)
try:
self.generateEngine()
self.createSession()

self.saveRecords()
self.commitChanges()
records = []

logger.info(f'Ingested {len(self.records)} DOAB records')
if self.process == 'daily':
records = self.dspace_service.get_records(offset=self.offset, limit=self.limit)
elif self.process == 'complete':
records = self.dspace_service.get_records(full_import=True, offset=self.offset, limit=self.limit)
elif self.process == 'custom':
records = self.dspace_service.get_records(start_timestamp=self.ingestPeriod, offset=self.offset, limit=self.limit)
elif self.singleRecord:
record = self.dspace_service.get_single_record(record_id=self.singleRecord, source_identifier=self.DOAB_IDENTIFIER)
self.manage_links(record)

def parseDOABRecord(self, oaiRec):
try:
doabRec = DOABMapping(oaiRec, self.OAI_NAMESPACES, self.constants)
doabRec.applyMapping()
except MappingError as e:
raise DOABError(e.message)
if records:
for record in records:
self.manage_links(record)

self.saveRecords()
self.commitChanges()

logger.info(f'Ingested {len(self.records) if records else 1} DOAB records')

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the result of the else conditional be 0 if the records array is empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this should still return 0 since the len of the records array would be 0 if it is empty.

linkManager = DOABLinkManager(doabRec.record)
except Exception as e:
logger.exception('Failed to run DOAB process')
raise e
finally:
self.close_connection()

def manage_links(self, record):
linkManager = DOABLinkManager(record.record)

linkManager.parseLinks()

for manifest in linkManager.manifests:
manifestPath, manifestJSON = manifest
self.s3_manager.createManifestInS3(manifestPath, manifestJSON, self.s3Bucket)
self.s3_manager.createManifestInS3(
manifestPath, manifestJSON, self.s3_bucket)

for epubLink in linkManager.ePubLinks:
ePubPath, ePubURI = epubLink
self.rabbitmq_manager.sendMessageToQueue(self.fileQueue, self.fileRoute, get_file_message(ePubURI, ePubPath))

self.addDCDWToUpdateList(doabRec)

def importSingleOAIRecord(self, recordID):
urlParams = 'verb=GetRecord&metadataPrefix=oai_dc&identifier=oai:directory.doabooks.org:{}'.format(recordID)
doabURL = '{}{}'.format(self.DOAB_BASE_URL, urlParams)

doabResponse = requests.get(doabURL, timeout=30)

if doabResponse.status_code == 200:
content = BytesIO(doabResponse.content)
oaidcXML = etree.parse(content)
oaidcRecord = oaidcXML.xpath('//oai_dc:dc', namespaces=self.OAI_NAMESPACES)[0]

try:
self.parseDOABRecord(oaidcRecord)
except DOABError as e:
logger.error(f'Error parsing DOAB record {oaidcRecord}')

def importOAIRecords(self, fullOrPartial=False, startTimestamp=None):
resumptionToken = None

recordsProcessed = 0
while True:
oaiFile = self.downloadOAIRecords(fullOrPartial, startTimestamp, resumptionToken=resumptionToken)

resumptionToken = self.getResumptionToken(oaiFile)

if recordsProcessed < self.ingestOffset:
recordsProcessed += 100
continue

oaidcRecords = etree.parse(oaiFile)

for record in oaidcRecords.xpath('//oai_dc:dc', namespaces=self.OAI_NAMESPACES):
if record is None: continue

try:
self.parseDOABRecord(record)
except DOABError as e:
logger.error(f'Error parsing DOAB record {record}')

recordsProcessed += 1

if recordsProcessed >= self.ingestLimit:
break

if not resumptionToken or recordsProcessed >= self.ingestLimit:
break

def getResumptionToken(self, oaiFile):
try:
oaiXML = etree.parse(oaiFile)
return oaiXML.find('.//resumptionToken', namespaces=self.ROOT_NAMESPACE).text
except AttributeError:
return None

def downloadOAIRecords(self, fullOrPartial, startTimestamp, resumptionToken=None):
headers = {
# Pass a user-agent header to prevent 403 unauthorized responses from DOAB
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}

urlParams = 'verb=ListRecords'
if resumptionToken:
urlParams = '{}&resumptionToken={}'.format(urlParams, resumptionToken)
elif fullOrPartial is False:
if not startTimestamp:
startTimestamp = (datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=24)).strftime('%Y-%m-%d')
urlParams = '{}&metadataPrefix=oai_dc&from={}'.format(urlParams, startTimestamp)
else:
urlParams = '{}&metadataPrefix=oai_dc'.format(urlParams)

doabURL = '{}{}'.format(self.DOAB_BASE_URL, urlParams)

doabResponse = requests.get(doabURL, stream=True, timeout=30, headers=headers)

if doabResponse.status_code == 200:
content = bytes()

for chunk in doabResponse.iter_content(1024 * 100): content += chunk

return BytesIO(content)

raise DOABError(f'Received {doabResponse.status_code} status code from {doabURL}')

self.rabbitmq_manager.sendMessageToQueue(self.file_queue, self.file_route, get_file_message(ePubURI, ePubPath))

class DOABError(Exception):
def __init__(self, message):
self.message = message
self.addDCDWToUpdateList(record)
1 change: 1 addition & 0 deletions services/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .sources.nypl_bib_service import NYPLBibService
from .sources.publisher_backlist_service import PublisherBacklistService
from .google_drive_service import GoogleDriveService
from .sources.dspace_service import DSpaceService
126 changes: 126 additions & 0 deletions services/sources/dspace_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from datetime import datetime, timedelta, timezone
from typing import Optional
import requests
from io import BytesIO
from lxml import etree
from constants.get_constants import get_constants
from logger import create_log
from mappings.base_mapping import MappingError
from mappings.xml import XMLMapping
from .source_service import SourceService

logger = create_log(__name__)


class DSpaceService(SourceService):
ROOT_NAMESPACE = {None: 'http://www.openarchives.org/OAI/2.0/'}
OAI_NAMESPACES = {
'oai_dc': 'http://www.openarchives.org/OAI/2.0/oai_dc/',
'dc': 'http://purl.org/dc/elements/1.1/',
'datacite': 'https://schema.datacite.org/meta/kernel-4.1/metadata.xsd',
'oapen': 'http://purl.org/dc/elements/1.1/',
'oaire': 'https://raw.githubusercontent.com/rcic/openaire4/master/schemas/4.0/oaire.xsd'
}

def __init__(self, base_url, source_mapping: type[XMLMapping]):
self.constants = get_constants()

self.base_url = base_url
self.source_mapping = source_mapping

def get_records(self, full_import=False, start_timestamp=None, offset: Optional[int]=None, limit: Optional[int]=None):
resumption_token = None

records_processed = 0
mapped_records = []
while resumption_token is not None or records_processed < offset:
oai_file = self.download_records(
full_import, start_timestamp, resumption_token=resumption_token)

resumption_token = self.get_resumption_token(oai_file)

if records_processed < offset:
records_processed += 100
continue

oaidc_records = etree.parse(oai_file)

for record in oaidc_records.xpath('//oai_dc:dc', namespaces=self.OAI_NAMESPACES):
if record is None:
continue

try:
parsed_record = self.parse_record(record)
mapped_records.append(parsed_record)
except Exception as e:
logger.error(f'Error parsing DSpace record {record}')

records_processed += 1

if limit is not None and records_processed >= limit:
return mapped_records

return mapped_records

def parse_record(self, record):
try:
record = self.source_mapping(record, self.OAI_NAMESPACES, self.constants)
record.applyMapping()
return record
except MappingError as e:
raise Exception(e.message)

def get_single_record(self, record_id, source_identifier):
url = f'{self.base_url}verb=GetRecord&metadataPrefix=oai_dc&identifier={source_identifier}:{record_id}'

response = requests.get(url, timeout=30)

if response.status_code == 200:
content = BytesIO(response.content)
oaidc_XML = etree.parse(content)
oaidc_record = oaidc_XML.xpath('//oai_dc:dc', namespaces=self.OAI_NAMESPACES)[0]

try:
parsed_record = self.parse_record(oaidc_record)
return parsed_record
except Exception as e:
logger.error(f'Error parsing DSpace record {oaidc_record}')

def get_resumption_token(self, oai_file):
try:
oai_XML = etree.parse(oai_file)
return oai_XML.find('.//resumptionToken', namespaces=self.ROOT_NAMESPACE).text
except AttributeError:
return None

def download_records(self, full_import, start_timestamp, resumption_token=None):
headers = {
# Pass a user-agent header to prevent 403 unauthorized responses from DSpace
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}

url_params = 'verb=ListRecords'
if resumption_token:
url_params = f'{url_params}&resumptionToken={resumption_token}'
elif full_import is False:
if not start_timestamp:
start_timestamp = (datetime.now(timezone.utc).replace(
tzinfo=None) - timedelta(hours=24)).strftime('%Y-%m-%d')
url_params = f'{url_params}&metadataPrefix=oai_dc&from={start_timestamp}'
else:
url_params = f'{url_params}&metadataPrefix=oai_dc'

url = f'{self.base_url}{url_params}'

response = requests.get(url, stream=True, timeout=30, headers=headers)

if response.status_code == 200:
content = bytes()

for chunk in response.iter_content(1024 * 100):
content += chunk

return BytesIO(content)

raise Exception(
f'Received {response.status_code} status code from {url}')
Loading
Loading