diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 1faac99b64d5..f09e14f06ee6 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -90,6 +90,8 @@ if TYPE_CHECKING: from apache_beam.runners.pipeline_context import PipelineContext +_LOGGER = logging.getLogger(__name__) + __all__ = [ 'BatchElements', 'CoGroupByKey', @@ -499,7 +501,7 @@ def get_secret_bytes(self) -> bytes: request={"name": secret_version_path}) return response.payload.data except api_exceptions.NotFound: - logging.info( + _LOGGER.info( "Secret version %s not found. " "Creating new secret and version.", secret_version_path) @@ -704,7 +706,7 @@ def expand(self, pcoll): try: coder = coder.as_deterministic_coder(self.label) except ValueError: - logging.warning( + _LOGGER.warning( 'GroupByEncryptedKey %s: ' 'The key coder is not deterministic. This may result in incorrect ' 'pipeline output. This can be fixed by adding a type hint to the ' @@ -1025,7 +1027,7 @@ def finish_bundle(self): self._batch = None self._running_batch_size = 0 self._target_batch_size = self._batch_size_estimator.next_batch_size() - logging.info( + _LOGGER.info( "BatchElements statistics: " + self._batch_size_estimator.stats()) @@ -1957,15 +1959,15 @@ def process( log_line += ', pane_info=' + repr(pane_info) if self.level == logging.DEBUG: - logging.debug(log_line) + _LOGGER.debug(log_line) elif self.level == logging.INFO: - logging.info(log_line) + _LOGGER.info(log_line) elif self.level == logging.WARNING: - logging.warning(log_line) + _LOGGER.warning(log_line) elif self.level == logging.ERROR: - logging.error(log_line) + _LOGGER.error(log_line) elif self.level == logging.CRITICAL: - logging.critical(log_line) + _LOGGER.critical(log_line) else: print(log_line)