pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/apache/iceberg-python/commit/1a8420c4961c665f96558cd16af117d526ec6e62

igin="anonymous" media="all" rel="stylesheet" href="https://github.githubassets.com/assets/global-9c8f61f9f58ad7b2.css" /> Moving/Renameing hadoop module to filesystem (#277) · apache/iceberg-python@1a8420c · GitHub
Skip to content

Commit 1a8420c

Browse files
TGooch44Fokko
authored andcommitted
Moving/Renameing hadoop module to filesystem (#277)
1 parent 767f082 commit 1a8420c

25 files changed

Lines changed: 696 additions & 381 deletions

iceberg/core/avro/avro_schema_util.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66
# "License"); you may not use this file except in compliance
77
# with the License. You may obtain a copy of the License at
88
#
9-
# http://www.apache.org/licenses/LICENSE-2.0
9+
# http://www.apache.org/licenses/LICENSE-2.0
1010
#
11-
# Unless required by applicable law or agreed to in writing, software
12-
# distributed under the License is distributed on an "AS IS" BASIS,
13-
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14-
# See the License for the specific language governing permissions and
15-
# limitations under the License.
16-
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
1717

1818
class AvroSchemaUtil(object):
1919
FIELD_ID_PROP = "field-id"

iceberg/core/base_metastore_table_operations.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818
import logging
1919
import uuid
2020

21-
from iceberg.core.hadoop import (get_fs,
22-
HadoopInputFile,
23-
HadoopOutputFile)
2421
from retrying import retry
2522

2623
from .table_metadata_parser import TableMetadataParser
2724
from .table_operations import TableOperations
25+
from .table_properties import TableProperties
2826

2927
_logger = logging.getLogger(__name__)
3028

@@ -46,6 +44,7 @@ def __init__(self, conf):
4644
self.current_metadata = None
4745
self.current_metadata_location = None
4846
self.base_location = None
47+
self.should_refresh = True
4948
self.version = -1
5049

5150
def current(self):
@@ -59,38 +58,62 @@ def data_location(self):
5958
return "{base_location}/{data}".format(base_location=self.base_location,
6059
data=BaseMetastoreTableOperations.DATA_FOLDER_NAME)
6160

61+
def request_refresh(self):
62+
self.should_refresh = True
63+
6264
def write_new_metadata(self, metadata, version):
65+
from .filesystem import FileSystemOutputFile
66+
6367
if self.base_location is None:
6468
self.base_location = metadata.location
6569

6670
new_filename = BaseMetastoreTableOperations.new_table_metadata_filename(self.base_location,
6771
version)
68-
new_metadata_location = HadoopOutputFile.from_path(new_filename, self.conf)
72+
new_metadata_location = FileSystemOutputFile.from_path(new_filename, self.conf)
6973

7074
TableMetadataParser.write(metadata, new_metadata_location)
7175
return new_filename
7276

7377
def refresh_from_metadata_location(self, new_location, num_retries=20):
7478
if not self.current_metadata_location == new_location:
7579
_logger.info("Refreshing table metadata from new version: %s" % new_location)
80+
self.retryable_refresh(new_location)
7681

77-
self.retryable_refresh(new_location)
82+
self.should_refresh = False
7883

7984
def new_input_file(self, path):
80-
return HadoopInputFile.from_location(path, self.conf)
85+
from .filesystem import FileSystemInputFile
86+
87+
return FileSystemInputFile.from_location(path, self.conf)
8188

8289
def new_metadata_file(self, filename):
83-
return HadoopOutputFile.from_path(BaseMetastoreTableOperations.new_metadata_location(self.base_location,
84-
filename),
85-
self.conf)
90+
from .filesystem import FileSystemOutputFile
91+
92+
return FileSystemOutputFile.from_path(BaseMetastoreTableOperations.new_metadata_location(self.base_location,
93+
filename),
94+
self.conf)
95+
96+
def metadata_file_location(self, file_name, metadata=None):
97+
if metadata is None:
98+
return self.metadata_file_location(file_name, metadata=self.current())
99+
100+
metadata_location = metadata.properties.get(TableProperties.WRITE_METADATA_LOCATION)
101+
102+
if metadata_location is not None:
103+
return "{}/{}".format(metadata_location, file_name)
104+
else:
105+
return "{}/{}/{}".format(metadata.location, BaseMetastoreTableOperations.METADATA_FOLDER_NAME, file_name)
86106

87107
def delete_file(self, path):
108+
from .filesystem import get_fs
88109
get_fs(path, self.conf).delete(path, False)
89110

90111
@retry(wait_incrementing_start=100, wait_exponential_multiplier=4,
91112
wait_exponential_max=5000, stop_max_delay=600000, stop_max_attempt_number=2)
92113
def retryable_refresh(self, location):
93-
self.current_metadata = TableMetadataParser.read(self, HadoopInputFile.from_location(location, self.conf))
114+
from .filesystem import FileSystemInputFile
115+
116+
self.current_metadata = TableMetadataParser.read(self, FileSystemInputFile.from_location(location, self.conf))
94117
self.current_metadata_location = location
95118
self.base_location = self.current_metadata.location
96119
self.version = BaseMetastoreTableOperations.parse_version(location)

iceberg/core/data_files.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
Metrics)
2121
from iceberg.api.types import Conversions
2222

23+
from .filesystem import FileSystemInputFile
2324
from .generic_data_file import GenericDataFile
24-
from .hadoop import HadoopInputFile
2525
from .partition_data import PartitionData
2626

2727

@@ -78,7 +78,7 @@ def copy(spec, partition):
7878

7979
@staticmethod
8080
def from_input_file(input_file, row_count, partition_data=None, metrics=None):
81-
if isinstance(input_file, HadoopInputFile):
81+
if isinstance(input_file, FileSystemInputFile):
8282
return DataFiles.from_stat(input_file.get_stat(), row_count,
8383
partition_data=partition_data, metrics=metrics)
8484

@@ -124,6 +124,7 @@ def clear(self):
124124
self.null_value_counts = None
125125
self.lower_bounds = None
126126
self.upper_bounds = None
127+
return self
127128

128129
def copy(self, to_copy):
129130
if self.is_partitioned:
@@ -148,7 +149,7 @@ def with_status(self, stat):
148149
return self
149150

150151
def with_input_file(self, input_file):
151-
if isinstance(input_file, HadoopInputFile):
152+
if isinstance(input_file, FileSystemInputFile):
152153
self.with_status(input_file.get_stat())
153154

154155
self.file_path = self.location()
@@ -158,6 +159,7 @@ def with_input_file(self, input_file):
158159

159160
def with_path(self, path):
160161
self.file_path = path
162+
return self
161163

162164
def with_format(self, fmt):
163165
if isinstance(fmt, FileFormat):
Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
__all__ = ["get_fs", "HadoopInputFile", "HadoopOutputFile"]
18+
__all__ = ["get_fs", "FileStatus", "FileSystem", "FileSystemInputFile", "FileSystemOutputFile",
19+
"FilesystemTableOperations", "FilesystemTables", "S3File", "S3FileSystem"]
1920

20-
from .hadoop_input_file import HadoopInputFile
21-
from .hadoop_output_file import HadoopOutputFile
21+
from .file_status import FileStatus
22+
from .file_system import FileSystem, FileSystemInputFile, FileSystemOutputFile
23+
from .filesystem_table_operations import FilesystemTableOperations
24+
from .filesystem_tables import FilesystemTables
25+
from .s3_filesystem import S3File, S3FileSystem
2226
from .util import get_fs
Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,33 @@
1717

1818
import gzip
1919

20-
from iceberg.api.io import InputFile
20+
from iceberg.api.io import InputFile, OutputFile
2121

2222
from .util import get_fs
2323

2424

25-
class HadoopInputFile(InputFile):
25+
class FileSystem(object):
26+
27+
def open(self, path, mode='rb'):
28+
raise NotImplementedError()
29+
30+
def create(self, path, overwrite=False):
31+
raise NotImplementedError()
32+
33+
def exists(self, path):
34+
raise NotImplementedError()
35+
36+
def delete(self, path):
37+
raise NotImplementedError()
38+
39+
def stat(self, path):
40+
raise NotImplementedError()
41+
42+
def rename(self, src, dest):
43+
raise NotImplementedError()
44+
45+
46+
class FileSystemInputFile(InputFile):
2647

2748
def __init__(self, fs, path, conf, length=None, stat=None):
2849
self.fs = fs
@@ -34,7 +55,7 @@ def __init__(self, fs, path, conf, length=None, stat=None):
3455
@staticmethod
3556
def from_location(location, conf):
3657
fs = get_fs(location, conf)
37-
return HadoopInputFile(fs, location, conf)
58+
return FileSystemInputFile(fs, location, conf)
3859

3960
def location(self):
4061
return self.path
@@ -57,5 +78,43 @@ def new_stream(self, gzipped=False):
5778
for line in fo:
5879
yield line
5980

60-
def new_fo(self):
61-
return self.fs.open(self.location())
81+
def new_fo(self, mode="rb"):
82+
return self.fs.open(self.location(), mode=mode)
83+
84+
def __repr__(self):
85+
return "FileSystemInputFile({})".format(self.path)
86+
87+
def __str__(self):
88+
return self.__repr__()
89+
90+
91+
class FileSystemOutputFile(OutputFile):
92+
93+
@staticmethod
94+
def from_path(path, conf):
95+
return FileSystemOutputFile(path, conf)
96+
97+
def __init__(self, path, conf):
98+
self.path = path
99+
self.conf = conf
100+
101+
def create(self, mode="w"):
102+
fs = get_fs(self.path, self.conf)
103+
if fs.exists(self.path):
104+
raise RuntimeError("File %s already exists" % self.path)
105+
106+
return fs.open(self.path, mode=mode)
107+
108+
def create_or_overwrite(self):
109+
fs = get_fs(self.path, self.conf)
110+
111+
return fs.open(self.path, "wb")
112+
113+
def location(self):
114+
return str(self.path)
115+
116+
def __repr__(self):
117+
return "FileSystemOutputFile({})".format(self.path)
118+
119+
def __str__(self):
120+
return self.__repr__()

0 commit comments

Comments
 (0)
pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy