[bugfix] fix merge conflict that broke Hive support (#3196)
This commit is contained in:
parent
e584a9673f
commit
e4fba0ffb7
|
|
@ -308,8 +308,12 @@ SILENCE_FAB = True
|
|||
# configuration. These blueprints will get integrated in the app
|
||||
BLUEPRINTS = []
|
||||
|
||||
try:
|
||||
# Provide a callable that receives a tracking_url and returns another
|
||||
# URL. This is used to translate internal Hadoop job tracker URL
|
||||
# into a proxied one
|
||||
TRACKING_URL_TRANSFORMER = lambda x: x
|
||||
|
||||
try:
|
||||
if CONFIG_PATH_ENV_VAR in os.environ:
|
||||
# Explicitly import config module that is not in pythonpath; useful
|
||||
# for case where app is being executed via pex.
|
||||
|
|
|
|||
|
|
@ -31,8 +31,9 @@ from flask_babel import lazy_gettext as _
|
|||
|
||||
from superset.utils import SupersetTemplateException
|
||||
from superset.utils import QueryStatus
|
||||
from superset import utils
|
||||
from superset import cache_util
|
||||
from superset import conf, cache_util, utils
|
||||
|
||||
tracking_url_trans = conf.get('TRACKING_URL_TRANSFORMER')
|
||||
|
||||
Grain = namedtuple('Grain', 'name label function')
|
||||
|
||||
|
|
@ -683,7 +684,7 @@ class HiveEngineSpec(PrestoEngineSpec):
|
|||
@classmethod
|
||||
def progress(cls, log_lines):
|
||||
total_jobs = 1 # assuming there's at least 1 job
|
||||
current_job = None
|
||||
current_job = 1
|
||||
stages = {}
|
||||
for line in log_lines:
|
||||
match = cls.jobs_stats_r.match(line)
|
||||
|
|
@ -692,6 +693,7 @@ class HiveEngineSpec(PrestoEngineSpec):
|
|||
match = cls.launching_job_r.match(line)
|
||||
if match:
|
||||
current_job = int(match.groupdict()['job_number'])
|
||||
total_jobs = int(match.groupdict()['max_jobs']) or 1
|
||||
stages = {}
|
||||
match = cls.stage_progress_r.match(line)
|
||||
if match:
|
||||
|
|
@ -701,10 +703,9 @@ class HiveEngineSpec(PrestoEngineSpec):
|
|||
stages[stage_number] = (map_progress + reduce_progress) / 2
|
||||
logging.info(
|
||||
"Progress detail: {}, "
|
||||
"total jobs: {}".format(stages, total_jobs))
|
||||
"current job {}, "
|
||||
"total jobs: {}".format(stages, current_job, total_jobs))
|
||||
|
||||
if not total_jobs or not current_job:
|
||||
return 0
|
||||
stage_progress = sum(
|
||||
stages.values()) / len(stages.values()) if stages else 0
|
||||
|
||||
|
|
@ -731,18 +732,16 @@ class HiveEngineSpec(PrestoEngineSpec):
|
|||
polled = cursor.poll()
|
||||
last_log_line = 0
|
||||
tracking_url = None
|
||||
job_id = None
|
||||
while polled.operationState in unfinished_states:
|
||||
query = session.query(type(query)).filter_by(id=query.id).one()
|
||||
if query.status == QueryStatus.STOPPED:
|
||||
cursor.cancel()
|
||||
break
|
||||
|
||||
resp = cursor.fetch_logs()
|
||||
if resp and resp.log:
|
||||
log = resp.log or ''
|
||||
log_lines = resp.log.splitlines()
|
||||
logging.info("\n".join(log_lines[last_log_line:]))
|
||||
last_log_line = len(log_lines) - 1
|
||||
log = cursor.fetch_logs() or ''
|
||||
if log:
|
||||
log_lines = log.splitlines()
|
||||
progress = cls.progress(log_lines)
|
||||
logging.info("Progress total: {}".format(progress))
|
||||
needs_commit = False
|
||||
|
|
@ -754,8 +753,20 @@ class HiveEngineSpec(PrestoEngineSpec):
|
|||
if tracking_url:
|
||||
logging.info(
|
||||
"Found the tracking url: {}".format(tracking_url))
|
||||
tracking_url = tracking_url_trans(tracking_url)
|
||||
logging.info(
|
||||
"Transformation applied: {}".format(tracking_url))
|
||||
query.tracking_url = tracking_url
|
||||
job_id = tracking_url.split('/')[-2]
|
||||
logging.info("Job id: {}".format(job_id))
|
||||
needs_commit = True
|
||||
if job_id and len(log_lines) > last_log_line:
|
||||
# Wait for job id before logging things out
|
||||
# this allows for prefixing all log lines and becoming
|
||||
# searchable in something like Kibana
|
||||
for l in log_lines[last_log_line:]:
|
||||
logging.info("[{}] {}".format(job_id, l))
|
||||
last_log_line = len(log_lines)
|
||||
if needs_commit:
|
||||
session.commit()
|
||||
time.sleep(5)
|
||||
|
|
|
|||
Loading…
Reference in New Issue