1818import logging
1919import uuid
2020
21- from iceberg .core .hadoop import (get_fs ,
22- HadoopInputFile ,
23- HadoopOutputFile )
2421from retrying import retry
2522
2623from .table_metadata_parser import TableMetadataParser
2724from .table_operations import TableOperations
25+ from .table_properties import TableProperties
2826
2927_logger = logging .getLogger (__name__ )
3028
@@ -46,6 +44,7 @@ def __init__(self, conf):
4644 self .current_metadata = None
4745 self .current_metadata_location = None
4846 self .base_location = None
47+ self .should_refresh = True
4948 self .version = - 1
5049
5150 def current (self ):
@@ -59,38 +58,62 @@ def data_location(self):
5958 return "{base_location}/{data}" .format (base_location = self .base_location ,
6059 data = BaseMetastoreTableOperations .DATA_FOLDER_NAME )
6160
61+ def request_refresh (self ):
62+ self .should_refresh = True
63+
6264 def write_new_metadata (self , metadata , version ):
65+ from .filesystem import FileSystemOutputFile
66+
6367 if self .base_location is None :
6468 self .base_location = metadata .location
6569
6670 new_filename = BaseMetastoreTableOperations .new_table_metadata_filename (self .base_location ,
6771 version )
68- new_metadata_location = HadoopOutputFile .from_path (new_filename , self .conf )
72+ new_metadata_location = FileSystemOutputFile .from_path (new_filename , self .conf )
6973
7074 TableMetadataParser .write (metadata , new_metadata_location )
7175 return new_filename
7276
7377 def refresh_from_metadata_location (self , new_location , num_retries = 20 ):
7478 if not self .current_metadata_location == new_location :
7579 _logger .info ("Refreshing table metadata from new version: %s" % new_location )
80+ self .retryable_refresh (new_location )
7681
77- self .retryable_refresh ( new_location )
82+ self .should_refresh = False
7883
7984 def new_input_file (self , path ):
80- return HadoopInputFile .from_location (path , self .conf )
85+ from .filesystem import FileSystemInputFile
86+
87+ return FileSystemInputFile .from_location (path , self .conf )
8188
8289 def new_metadata_file (self , filename ):
83- return HadoopOutputFile .from_path (BaseMetastoreTableOperations .new_metadata_location (self .base_location ,
84- filename ),
85- self .conf )
90+ from .filesystem import FileSystemOutputFile
91+
92+ return FileSystemOutputFile .from_path (BaseMetastoreTableOperations .new_metadata_location (self .base_location ,
93+ filename ),
94+ self .conf )
95+
96+ def metadata_file_location (self , file_name , metadata = None ):
97+ if metadata is None :
98+ return self .metadata_file_location (file_name , metadata = self .current ())
99+
100+ metadata_location = metadata .properties .get (TableProperties .WRITE_METADATA_LOCATION )
101+
102+ if metadata_location is not None :
103+ return "{}/{}" .format (metadata_location , file_name )
104+ else :
105+ return "{}/{}/{}" .format (metadata .location , BaseMetastoreTableOperations .METADATA_FOLDER_NAME , file_name )
86106
87107 def delete_file (self , path ):
108+ from .filesystem import get_fs
88109 get_fs (path , self .conf ).delete (path , False )
89110
90111 @retry (wait_incrementing_start = 100 , wait_exponential_multiplier = 4 ,
91112 wait_exponential_max = 5000 , stop_max_delay = 600000 , stop_max_attempt_number = 2 )
92113 def retryable_refresh (self , location ):
93- self .current_metadata = TableMetadataParser .read (self , HadoopInputFile .from_location (location , self .conf ))
114+ from .filesystem import FileSystemInputFile
115+
116+ self .current_metadata = TableMetadataParser .read (self , FileSystemInputFile .from_location (location , self .conf ))
94117 self .current_metadata_location = location
95118 self .base_location = self .current_metadata .location
96119 self .version = BaseMetastoreTableOperations .parse_version (location )
0 commit comments