Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
workspaces
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
ssa
workspaces
Merge requests
!362
Post Apocalypse Round 5 - restore & image
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
Post Apocalypse Round 5 - restore & image
testing_barrage_fixes
into
main
Overview
0
Commits
7
Pipelines
2
Changes
4
Merged
Charlotte Hausman
requested to merge
testing_barrage_fixes
into
main
3 years ago
Overview
0
Commits
7
Pipelines
2
Changes
17
Expand
0
0
Merge request reports
Compare
version 1
version 1
d589f6e6
3 years ago
main (base)
and
latest version
latest version
8adc2ab3
7 commits,
3 years ago
version 1
d589f6e6
3 commits,
3 years ago
Show latest version
17 files
+
1485
−
858
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
17
Search (e.g. *.vue) (Ctrl+P)
apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py
+
393
−
13
Options
"""
This is the entrypoint for ingestion launching
"""
import
abc
import
json
import
logging
import
re
import
sys
import
tarfile
from
pathlib
import
Path
# pylint: disable=R0903
# pylint: disable=E0401, R0903, R1721
from
typing
import
Tuple
,
List
import
pendulum
from
pendulum
import
DateTime
from
ingest_envoy.ingestion_manifest_writer
import
EvlaCalIngestionManifestWriter
from
ingest_envoy.utilities
import
ScienceProductType
from
ingest_envoy.manifest_components
import
(
MANIFEST_NAME_BASE
,
MANIFEST_NAME_EXT
,
ARTIFACT_NAME
,
ARTIFACT_EXT
,
WEBLOG
,
JSON
,
IngestionManifestKey
,
ManifestComponentIF
,
InputScienceProduct
,
InputGroup
,
ManifestParameters
,
OutputScienceProduct
,
AncillaryProduct
,
OutputGroup
,
SCIENCE_PRODUCT_PATTERN
,
)
from
ingest_envoy.utilities
import
(
ScienceProductType
,
Telescope
,
IngestionManifestException
,
AncillaryProductType
,
find_output_science_products
,
)
logger
=
logging
.
getLogger
(
__name__
)
logger
.
setLevel
(
logging
.
INFO
)
logger
.
addHandler
(
logging
.
StreamHandler
(
sys
.
stdout
))
# pylint: disable=R0902, R0913
class
IngestionManifest
:
"""
needed for ingestion-launching interface
"""
def
__init__
(
self
,
staging_source_dir
:
str
,
ingestion_type
:
str
,
locator
:
str
):
self
.
ingest_path
=
Path
(
staging_source_dir
)
self
.
sp_type
=
ScienceProductType
.
from_str
(
ingestion_type
)
class
ManifestIF
(
ManifestComponentIF
):
"""
Interface for all ingestion manifests
"""
def
__init__
(
self
,
telescope
:
Telescope
,
sp_type
:
ScienceProductType
,
staging_source_dir
:
Path
,
locator
:
str
,
# all except EVLA_EB and VLASS catalog manifest have input group
input_group
:
InputGroup
,
# every manifest has at least one output group
output_group
:
OutputGroup
,
):
self
.
staging_source_dir
=
staging_source_dir
self
.
sp_type
=
sp_type
self
.
locator
=
locator
self
.
input_group
=
input_group
self
.
output_group
=
output_group
self
.
telescope
=
telescope
self
.
parameters
=
self
.
build_ingest_parameters
()
self
.
files_found
=
[
file
for
file
in
self
.
staging_source_dir
.
iterdir
()]
@abc.abstractmethod
def
create
(
self
):
"""
Build and write the manifest, which includes gathering various items in
ingestion_path to get info for the manifest.
:return:
"""
@abc.abstractmethod
def
write
(
self
):
"""
Write this manifest to a file, along with the artifacts tar and any other files required
for this type of ingestion, at the ingest_path
:param: location of files to be ingested, which is where we
'
ll put the manifest
:return:
"""
raise
NotImplementedError
def
__eq__
(
self
,
other
):
if
isinstance
(
other
,
IngestionManifest
):
return
other
.
input_group
==
self
.
input_group
and
other
.
output_group
==
self
.
output_group
return
False
@abc.abstractmethod
def
to_json
(
self
)
->
JSON
:
"""
Turn this object into a JSON string suitable for writing to a file
:return:
"""
raise
NotImplementedError
class
IngestionManifestBuilder
:
"""
Builds ingestion manifest and associated files from files in ingestion_path
"""
def
__init__
(
self
,
staging_source_dir
:
Path
,
sp_type
:
ScienceProductType
,
locator
:
str
,
telescope
:
Telescope
,
):
self
.
telescope
=
telescope
self
.
staging_source_dir
=
staging_source_dir
self
.
sp_type
=
sp_type
self
.
locator
=
locator
self
.
files_found
=
[
file
for
file
in
staging_source_dir
.
iterdir
()]
if
len
(
self
.
files_found
)
==
0
:
raise
IngestionManifestException
(
f
"
No ingestion files found at
{
staging_source_dir
}
"
)
def
build
(
self
)
->
Tuple
[
ManifestIF
,
Path
]:
"""
Using only -relevant- files in ingestion_path, write the manifest
and produce other files required for ingestion.
:return: the ingestion manifest and the file containing its JSON
"""
# # create any other ingestion files needed for this type of ingestion
# self._find_additional_ingestion_files()
# create the manifest
manifest
=
IngestionManifest
(
telescope
=
self
.
telescope
,
locator
=
self
.
locator
,
sp_type
=
self
.
sp_type
,
staging_source_dir
=
self
.
staging_source_dir
,
input_group
=
self
.
_build_input_group
(),
output_group
=
self
.
_build_output_group
(),
)
manifest_file
=
manifest
.
write
()
self
.
write_ingestion_artifacts_tar
()
return
manifest
,
manifest_file
def
_build_input_group
(
self
):
"""
Create the input group using the parameters.
:return:
"""
# N.B. this is sufficient for most types of ingestion,
# but ALMA CALs will have multiple EB SPs, identified only by locator,
# and VLBAs have no input group at all.
sp_in
=
InputScienceProduct
(
sp_type
=
self
.
sp_type
,
locator
=
self
.
locator
)
return
InputGroup
([
sp_in
])
def
_define_output_science_products
(
self
):
sp_files
=
find_output_science_products
(
self
.
files_found
,
self
.
staging_source_dir
)
sps_out
=
[
OutputScienceProduct
(
self
.
sp_type
,
file
.
name
)
for
file
in
sp_files
]
return
sps_out
def
_build_output_group
(
self
)
->
OutputGroup
:
"""
Create the output group using the parameters.
:return:
"""
# find ancillary products, if any
ancillary_products
=
self
.
_find_ancillary_products
()
tar_filename
=
self
.
build_artifacts_filename
()
artifacts_ap
=
AncillaryProduct
(
type
=
AncillaryProductType
.
PIPELINE_ARTIFACTS
,
filename
=
tar_filename
)
ancillary_products
.
append
(
artifacts_ap
)
return
OutputGroup
(
self
.
_define_output_science_products
(),
ancillary_products
)
@staticmethod
def
build_artifacts_filename
()
->
str
:
"""
Build unique manifest filename in standard format.
:return: the filename
"""
current_time
=
pendulum
.
now
()
timestamp
=
format_timestamp
(
current_time
)
return
f
"
{
ARTIFACT_NAME
}{
timestamp
}{
ARTIFACT_EXT
}
"
def
write_ingestion_artifacts_tar
(
self
)
->
Path
:
"""
Take the list of files and build a tar for inclusion into the archive.
This happens in the staging area for ingestion.
The EVLA CAL tar will contain just the manifest.
:return: a .tar archive of the ingestion artifacts
"""
ingestion_files
=
[
file
for
file
in
self
.
staging_source_dir
.
iterdir
()
if
file
.
is_file
]
manifest_file
=
find_manifest
(
self
.
staging_source_dir
)
ing_tar
=
self
.
staging_source_dir
/
self
.
build_artifacts_filename
()
with
tarfile
.
open
(
ing_tar
,
"
w
"
)
as
ingestion_artifacts_tar
:
for
file
in
ingestion_files
:
ingestion_artifacts_tar
.
add
(
file
)
# include the manifest
if
manifest_file
not
in
ingestion_files
:
ingestion_artifacts_tar
.
add
(
manifest_file
)
return
ing_tar
def
_find_ancillary_products
(
self
)
->
List
[
AncillaryProduct
]:
"""
Round up any ancillary files found in ingestion path
:return: ancillary product(s) found
"""
ancillary_products
=
[]
# if there's a weblog in here, grab it
maybe_weblogs
=
[
file
for
file
in
self
.
files_found
if
file
.
name
.
endswith
(
WEBLOG
)]
if
len
(
maybe_weblogs
)
>
0
:
weblog
=
maybe_weblogs
[
0
]
weblog_ap
=
AncillaryProduct
(
type
=
AncillaryProductType
.
PIPELINE_WEBLOG
,
filename
=
weblog
.
name
)
ancillary_products
.
append
(
weblog_ap
)
more_aps
=
self
.
_find_additional_ingestion_files
()
if
len
(
more_aps
)
>
0
:
ancillary_products
.
append
(
more_aps
)
return
ancillary_products
def
_find_additional_ingestion_files
(
self
)
->
List
[
Path
]:
"""
Round up any other necessary ingestion file(s)
:return: additional relevant files found in ingestion path, if any
"""
if
self
.
sp_type
==
ScienceProductType
.
EVLA_CAL
:
# there won't be any others
return
[]
# TODO when the time comes: we'll have extra information for other ingestion types;
# see archive-metaproject
raise
NotImplementedError
class
IngestionManifest
(
ManifestIF
):
"""
needed for ingestion-launching interface
"""
def
build_ingest_parameters
(
self
):
"""
Make the
"
parameters
"
section of the manifest
:return:
"""
if
self
.
sp_type
!=
ScienceProductType
.
EVLA_CAL
:
raise
NotImplementedError
()
return
ManifestParameters
(
telescope
=
self
.
telescope
,
reingest
=
False
,
ngas_ingest
=
False
,
calibrate
=
False
,
staging_source_dir
=
self
.
staging_source_dir
,
)
def
_build_input_group
(
self
):
"""
Create the input group using the parameters.
:return:
"""
# N.B. this is sufficient for most types of ingestion,
# but ALMA CALs will have multiple EB SPs, identified only by locator,
# and VLBAs have no input group at all.
sp_in
=
InputScienceProduct
(
sp_type
=
self
.
sp_type
,
locator
=
self
.
locator
)
return
InputGroup
([
sp_in
])
def
_build_output_group
(
self
)
->
OutputGroup
:
"""
Create the output group using the parameters.
:return:
"""
sp_tar
=
self
.
_find_science_product_tar
()
find_output_science_products
(
self
.
files_found
,
self
.
staging_source_dir
)
sps_out
=
[
OutputScienceProduct
(
self
.
sp_type
,
sp_tar
.
name
)]
# find ancillary products, if any
ancillary_products
=
self
.
_find_ancillary_products
()
weblog
=
Path
(
self
.
ingestion_path
/
WEBLOG
)
if
weblog
.
exists
():
ancillary_products
.
append
(
AncillaryProduct
(
type
=
WEBLOG
,
filename
=
str
(
weblog
)))
return
OutputGroup
(
sps_out
)
# @property
def
ingestion_path
(
self
)
->
Path
:
return
self
.
parameters
.
ingestion_path
def
write
(
self
)
->
Path
:
"""
Write the manifest .json file.
:return:
"""
output_path
=
self
.
staging_source_dir
/
build_manifest_filename
()
to_write
=
json
.
dumps
(
self
.
to_json
(),
indent
=
4
)
with
open
(
output_path
,
"
w
"
)
as
out
:
out
.
write
(
to_write
)
return
output_path
def
create
(
self
):
"""
@@ -31,9 +335,85 @@ class IngestionManifest:
"""
if
self
.
sp_type
!=
ScienceProductType
.
EVLA_CAL
:
r
eturn
NotImplementedError
(
f
"
Don
'
t yet know how to handle
{
self
.
sp_type
.
value
}
science product
"
r
aise
NotImplementedError
(
f
"
Don
'
t yet know how to handle
{
self
.
sp_type
.
value
}
ingestion
"
)
writer
=
EvlaCalIngestionManifestWriter
(
self
.
ingest_path
)
writer
.
write_evla_cal_manifest
(
self
.
locator
)
builder
=
IngestionManifestBuilder
(
staging_source_dir
=
Path
(
self
.
staging_source_dir
),
sp_type
=
self
.
sp_type
,
locator
=
self
.
locator
,
telescope
=
self
.
telescope
,
)
builder
.
build
()
def
to_json
(
self
)
->
JSON
:
"""
Turn this object into a JSON string suitable for writing to a file
:return:
"""
to_return
=
dict
(
self
.
__dict__
)
return
{
"
locator
"
:
to_return
[
"
locator
"
],
IngestionManifestKey
.
PARAMETERS
.
value
:
self
.
build_ingest_parameters
().
to_json
(),
IngestionManifestKey
.
INGESTION_PATH
.
value
:
str
(
self
.
ingestion_path
),
IngestionManifestKey
.
INPUT_GROUP
.
value
:
to_return
[
IngestionManifestKey
.
INPUT_GROUP
.
value
].
to_json
(),
IngestionManifestKey
.
OUTPUT_GROUP
.
value
:
to_return
[
IngestionManifestKey
.
OUTPUT_GROUP
.
value
].
to_json
(),
}
def
_find_science_product_tar
(
self
)
->
Path
:
"""
A calibration ingestion staging dir should have ONE science product tar; ignore any others
:return:
"""
files
=
[
file
for
file
in
self
.
staging_source_dir
.
iterdir
()
if
file
.
is_file
]
for
file
in
files
:
if
re
.
match
(
SCIENCE_PRODUCT_PATTERN
,
file
.
name
):
return
file
raise
FileNotFoundError
(
f
"
no science product found at
{
self
.
staging_source_dir
}
"
)
def
format_timestamp
(
datetime
:
DateTime
)
->
str
:
"""
Format the current time as
2021_07_01T13_49_17.237
:param datetime: current timestamp
:return: timestamp suitable for ingestion manifest filename
"""
return
datetime
.
format
(
"
YYYY_MM_DDThh_mm_ss.SSS
"
)
def
build_manifest_filename
()
->
str
:
"""
Build unique manifest filename in standard format.
:return: the filename
"""
current_time
=
pendulum
.
now
()
timestamp
=
format_timestamp
(
current_time
)
return
f
"
{
MANIFEST_NAME_BASE
}{
timestamp
}{
MANIFEST_NAME_EXT
}
"
def
find_manifest
(
ingestion_path
:
Path
)
->
Path
:
"""
Find the ingestion manifest at this ingestion path.
:param ingestion_path: home of ingestion files
:return:
"""
for
file
in
ingestion_path
.
iterdir
():
if
file
.
name
.
startswith
(
MANIFEST_NAME_BASE
)
and
file
.
name
.
endswith
(
MANIFEST_NAME_EXT
):
return
file
raise
FileNotFoundError
(
f
"
No ingestion manifest found at
{
ingestion_path
}
"
)
Loading