Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 66 additions & 9 deletions irods/data_object.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
"""
Interface for iRODS data objects.

Provides high level abstraction and POSIX-like facilities (create, open,
read/write) allowing clients to manipulate data objects very much as if they
were local files.
"""

import ast
import enum
import io
import sys
import logging
import os
import ast
import sys
from datetime import datetime, timezone

from irods.models import DataObject
from irods.meta import iRODSMetaCollection
import irods.keywords as kw
from irods.api_number import api_number
from irods.message import JSON_Message, iRODSMessage
from irods.meta import iRODSMetaCollection
from irods.models import DataObject

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -41,11 +51,57 @@ def __repr__(self):
return "<{}.{} {}>".format(self.__class__.__module__, self.__class__.__name__, self.resource_name)


class _repl_status(enum.Enum): # noqa: N801
STALE_REPLICA, GOOD_REPLICA, INTERMEDIATE_REPLICA, READ_LOCKED, WRITE_LOCKED = range(5)


# An ordering of the various replica status values, by descending fitness for use/interface
_REPL_STATUSES = tuple(
getattr(_repl_status, ident).value
for ident in (
"GOOD_REPLICA",
"STALE_REPLICA",
"INTERMEDIATE_REPLICA",
"READ_LOCKED",
"WRITE_LOCKED",
)
)

# An appropriate reference datetime value for gauging replica age as part of
# the default sort key in PRC4 and onward.
_REFERENCE_DATETIME = datetime.fromtimestamp(0, timezone.utc)

# ruff: noqa: D103 off

# Key functions to dictate how replica row results will be sorted within an iRODSDataObject.


def REPLICA_NUMBER_SORT_KEY_FN(row): # noqa: N802
return row[DataObject.replica_number]


def REPLICA_FITNESS_SORT_KEY_FN(row): # noqa: N802
repl_status = int(row[DataObject.replica_status])

repl_status_rank = _REPL_STATUSES.index(repl_status) if _REPL_STATUSES.count(repl_status) else sys.maxsize

return (repl_status_rank, _REFERENCE_DATETIME - row[DataObject.modify_time])


# ruff: noqa: D103 on

_DEFAULT_SORT_KEY_FN = REPLICA_NUMBER_SORT_KEY_FN
Comment on lines +79 to +93
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any documentation around these?
Do the functions which accept them document them explicitly?
How do users learn about these?



class iRODSDataObject:
def __init__(self, manager, parent=None, results=None):
# iRODSDataObject's constructor is not usually directly accessed by iRODS client applications. See the main README.
# ruff: noqa: D107 off

def __init__(self, manager, parent=None, results=None, replica_sort_function=None):
self.manager = manager
if parent and results:
self.collection = parent
results = sorted(results, key=(replica_sort_function or _DEFAULT_SORT_KEY_FN))
for attr, value in DataObject.__dict__.items():
if not attr.startswith("_"):
try:
Expand All @@ -54,9 +110,8 @@ def __init__(self, manager, parent=None, results=None):
# backward compatibility with older schema versions
pass
self.path = self.collection.path + "/" + self.name
replicas = sorted(results, key=lambda r: r[DataObject.replica_number])

# The status quo before iRODS 5
# Copy pre-iRODS 5 fields

replica_args = [
(
Expand All @@ -75,18 +130,20 @@ def __init__(self, manager, parent=None, results=None):
modify_time=r[DataObject.modify_time],
),
)
for r in replicas
for r in results
]

# Adjust for adding access_time in the iRODS 5 case.

if self.manager.sess.server_version >= (5,):
for n, r in enumerate(replicas):
for n, r in enumerate(results):
replica_args[n][1]['access_time'] = r[DataObject.access_time]
self.replicas = [iRODSReplica(*a, **k) for a, k in replica_args]

self._meta = None

# ruff: noqa: D107 off

def __repr__(self):
return f"<iRODSDataObject {self.id} {self.name}>"

Expand Down
42 changes: 25 additions & 17 deletions irods/manager/data_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,27 +218,29 @@ def should_parallelize_transfer(
if size is not None and isinstance(open_options, dict):
open_options[kw.DATA_SIZE_KW] = size

def _download(self, obj, local_path, num_threads, updatables=(), **options):
def _download(self, obj_path, local_path, num_threads, updatables=(), **options):
"""Transfer the contents of a data object to a local file.

Called from get() when a local path is named.
"""
if os.path.isdir(local_path):
local_file = os.path.join(local_path, irods_basename(obj))
else:
local_file = local_path

local_file = (
os.path.join(local_path, irods_basename(obj_path)) # noqa: PTH118
if os.path.isdir(local_path) # noqa: PTH112
else local_path
)

# Check for force flag if local_file exists
if os.path.exists(local_file) and kw.FORCE_FLAG_KW not in options:
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG

data_open_returned_values_ = {}
with self.open(obj, "r", returned_values=data_open_returned_values_, **options) as o:
with self.open(obj_path, "r", returned_values=data_open_returned_values_, **options) as o:
if self.should_parallelize_transfer(num_threads, o, open_options=options.items()):
error = RuntimeError("parallel get failed")
try:
if not self.parallel_get(
(obj, o),
(obj_path, o),
local_file,
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, ""),
Expand All @@ -265,6 +267,8 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda
"""
parent = self.sess.collections.get(irods_dirname(path))

replica_sort_function = options.pop('replica_sort_function', None)

# TODO: optimize
if local_path:
self._download(path, local_path, num_threads=num_threads, updatables=updatables, **options)
Expand All @@ -284,7 +288,7 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda
results = query.all() # get up to max_rows replicas
if len(results) <= 0:
raise ex.DataObjectDoesNotExist()
return iRODSDataObject(self, parent, results)
return iRODSDataObject(self, parent, results, replica_sort_function=replica_sort_function)

@staticmethod
def _resolve_force_put_option(options, default_setting=None, true_value=""):
Expand Down Expand Up @@ -317,23 +321,25 @@ def put(
self._resolve_force_put_option(options, default_setting=client_config.data_objects.force_put_by_default)

if self.sess.collections.exists(irods_path):
obj = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path))
obj_path = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path)) # noqa: PTH119
else:
obj = irods_path
if kw.FORCE_FLAG_KW not in options and self.exists(obj):
obj_path = irods_path
if kw.FORCE_FLAG_KW not in options and self.exists(obj_path):
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG
options.pop(kw.FORCE_FLAG_KW, None)

replica_sort_function = options.pop('replica_sort_function', None)

with open(local_path, "rb") as f:
sizelist = []
if self.should_parallelize_transfer(num_threads, f, measured_obj_size=sizelist, open_options=options):
o = deferred_call(self.open, (obj, "w"), options)
o = deferred_call(self.open, (obj_path, "w"), options)
f.close()
error = RuntimeError("parallel put failed")
try:
if not self.parallel_put(
local_path,
(obj, o),
(obj_path, o),
total_bytes=sizelist[0],
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, "") or options.get(kw.DEST_RESC_NAME_KW, ""),
Expand All @@ -346,7 +352,7 @@ def put(
except BaseException as e:
raise error from e
else:
with self.open(obj, "w", **options) as o:
with self.open(obj_path, "w", **options) as o:
# Set operation type to trigger acPostProcForPut
if kw.OPR_TYPE_KW not in options:
options[kw.OPR_TYPE_KW] = 1 # PUT_OPR
Expand All @@ -360,10 +366,11 @@ def put(
# Requested to register checksum without verifying, but source replica has a checksum. This can result
# in multiple replicas being marked good with different checksums, which is an inconsistency.
del repl_options[kw.REG_CHKSUM_KW]
self.replicate(obj, **repl_options)
self.replicate(obj_path, **repl_options)

if return_data_object:
return self.get(obj)
return self.get(obj_path, replica_sort_function=replica_sort_function)
return None

def chksum(self, path, **options):
"""
Expand Down Expand Up @@ -480,6 +487,7 @@ def create(
raise ex.DataObjectExistsAtLogicalPath

options = {**options, kw.DATA_TYPE_KW: "generic"}
replica_sort_function = options.pop('replica_sort_function', None)

if resource:
options[kw.DEST_RESC_NAME_KW] = resource
Expand Down Expand Up @@ -508,7 +516,7 @@ def create(
desc = response.int_info
conn.close_file(desc)

return self.get(path)
return self.get(path, replica_sort_function=replica_sort_function)

def open_with_FileRaw(self, *arg, **kw_options):
holder = []
Expand Down
Loading
Loading