hue整合数据源连接 TIDB案例

hue 作为数据分析平台,结合多种数据源的使用,方便分析人员和开发人员能够共享钻取数据,了解数据情况,但是因为hue需要配置各自数据源的连接方式才可以使用。以下提供hue整合tidb的实现方式。

hue 停止\启动 命令

cd hue/build/env/bin
./start.sh hadoop hadoop /data/emr/hue/logs  /usr/local/service/hue/build/env/bin/
./stop.sh

hue 修改配置

cd desktop/conf
vi pseudo-distributed.ini
[notebook]

[[interpreters]]

[[[sparksql]]]
      name=SparkSQL TIDB base
      interface=sqlalchemy
      is_sql=false
      options='{"url": "hive://172.16.48.35:10015"}'
      is_catalog=false
      category=editor
      
[[[mysql]]]
      name=MySQL TIDB base
      interface=sqlalchemy
      options='{"url": "mysql://root:3333@172.1.0.1:4000"}'      
      
      
[[[tidb_pyspark_base]]]
      name=TIDB PySpark Base
      interface=livy
      options='{"kind":"pyspark","driverMemory": "4G", "executorCores": 4, "driverCores": 2,"executorMemory": "4G","conf": {"spark.tispark.pd.addresses":"172.1.0.1:2379","spark.tispark.isolation_read_engines":"tikv,tiflash","spark.tispark.plan.allow_index_read":"false","spark.sql.extensions":"org.apache.spark.sql.TiExtensions"}}'
      is_sql=false
      is_catalog=false
      category=editor
      dialect=pyspark
      
vi  desktop/libs/notebook/src/notebook/connectors/base.py

elif interface == 'livy':
    from notebook.connectors.spark_shell import SparkApi
    options = interpreter.get('options', {})
    optAPi = SparkApi(request.user)
    optAPi.init_options(options)
    return optAPi

_data[‘snippets’].append(self._make_snippet({ u’type’: u’spark’, u’status’: u’running’, u’properties’: { u’files’: files, u’class’: clazz, u’app_jar’: jars, u’arguments’: arguments, u’archives’: [], u’spark_opts’: ’’ } }))

      
      
      
```shell
 vi  desktop/libs/notebook/src/notebook/connectors/spark_shell.py
props['kind'] = lang
if 'kind' in props.keys():
  lang = props['kind']

for k in self.options.keys():
  props[k] = self.options[k]
def execute(self, notebook, snippet):
    api = get_spark_api(self.user)
    session = _get_snippet_session(notebook, snippet)

    try:
      response = api.submit_statement(session['id'], snippet['statement'])
      return {
          'id': response['id'],
          'has_result_set': True,
          'sync': False
      }
    except Exception as e:
      message = force_unicode(str(e)).lower()
      if re.search("session ('\d+' )?not found", message) or 'connection refused' in message or 'session is in state busy' in message:
        raise SessionExpired(e)
      else:
        raise e
vi  desktop/libs/notebook/src/notebook/templates/editor_components.mako
<script type="text/javascript">
  window.EDITOR_BINDABLE_ELEMENT = '#${ bindableElement }';

  window.EDITOR_SUFFIX = '${ suffix }';

  var HUE_PUB_SUB_EDITOR_ID = (window.location.pathname.indexOf('notebook') > -1) ? 'notebook' : 'editor';

  window.EDITOR_VIEW_MODEL_OPTIONS = $.extend(${ options_json | n,unicode,antixss }, {
    huePubSubId: HUE_PUB_SUB_EDITOR_ID,
    user: '${ user.username }',
    userId: ${ user.id },
    suffix: '${ suffix }',
    assistAvailable: true,
    autocompleteTimeout: AUTOCOMPLETE_TIMEOUT,
    snippetViewSettings: {
      default: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/sql',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
      code: {
        placeHolder: '${ _("Example: 1 + 1, or press CTRL + space") }',
        snippetIcon: 'fa-code'
      },
      hive: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/hive',
        snippetImage: '${ static("beeswax/art/icon_beeswax_48.png") }',
        sqlDialect: true
      },
      impala: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/impala',
        snippetImage: '${ static("impala/art/icon_impala_48.png") }',
        sqlDialect: true
      },presto: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/presto',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
      elasticsearch: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/elasticsearch',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
      druid: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/druid',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
      bigquery: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/bigquery',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
      phoenix: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/phoenix',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
      ksql: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/ksql',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
      jar : {
        snippetIcon: 'fa-file-archive-o '
      },
      mysql: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/mysql',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
     mysqljdbc: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/mysql',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
      oracle: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/oracle',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
      pig: {
        placeHolder: '${ _("Example: 1 + 1, or press CTRL + space") }',
        aceMode: 'ace/mode/pig',
        snippetImage: '${ static("pig/art/icon_pig_48.png") }'
      },
      postgresql: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/pgsql',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
      solr: {
        placeHolder: '${ _("Example: SELECT fieldA, FieldB FROM collectionname, or press CTRL + space") }',
        aceMode: 'ace/mode/mysql',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
      kafkasql: {
        placeHolder: '${ _("Example: SELECT fieldA, FieldB FROM collectionname, or press CTRL + space") }',
        aceMode: 'ace/mode/mysql',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
      java : {
        snippetIcon: 'fa-file-code-o'
      },
      py : {
        snippetIcon: 'fa-file-code-o'
      },
      pyspark: {
        placeHolder: '${ _("Example: 1 + 1, or press CTRL + space") }',
        aceMode: 'ace/mode/python',
        snippetImage: '${ static("spark/art/icon_spark_48.png") }'
      },
      tidb_pyspark_base: {
        placeHolder: '${ _("Example: 1+1  or press CTRL + space") }',
        aceMode: 'ace/mode/python',
        snippetImage: '${ static("spark/art/icon_spark_48.png") }'
      },
      tidb_pyspark_Shopee: {
        placeHolder: '${ _("Example: 1 + 1, or press CTRL + space") }',
        aceMode: 'ace/mode/python',
        snippetImage: '${ static("spark/art/icon_spark_48.png") }'
      },
      r: {
        placeHolder: '${ _("Example: 1 + 1, or press CTRL + space") }',
        aceMode: 'ace/mode/r',
        snippetImage: '${ static("spark/art/icon_spark_48.png") }'
      },
      scala: {
        placeHolder: '${ _("Example: 1 + 1, or press CTRL + space") }',
        aceMode: 'ace/mode/scala',
        snippetImage: '${ static("spark/art/icon_spark_48.png") }'
      },
      spark: {
        placeHolder: '${ _("Example: 1 + 1, or press CTRL + space") }',
        aceMode: 'ace/mode/scala',
        snippetImage: '${ static("spark/art/icon_spark_48.png") }'
      },
       spark2: {
        snippetImage: '${ static("spark/art/icon_spark_48.png") }'
      },
      mapreduce: {
        snippetIcon: 'fa-file-archive-o'
      },
      shell: {
        snippetIcon: 'fa-terminal'
      },
      sqoop1: {
        placeHolder: '${ _("Example: import  --connect jdbc:hsqldb:file:db.hsqldb --table TT --target-dir hdfs://localhost:8020/user/foo -m 1") }',
        snippetImage: '${ static("sqoop/art/icon_sqoop_48.png") }'
      },
      distcp: {
        snippetIcon: 'fa-files-o'
      },
      sqlite: {
        placeHolder: '${ _("Example: SELECT * FROM tablename, or press CTRL + space") }',
        aceMode: 'ace/mode/sql',
        snippetIcon: 'fa-database',
        sqlDialect: true
      },
      text: {
        placeHolder: '${ _('Type your text here') }',
        aceMode: 'ace/mode/text',
        snippetIcon: 'fa-header'
      },
      markdown: {
        placeHolder: '${ _('Type your markdown here') }',
        aceMode: 'ace/mode/markdown',
        snippetIcon: 'fa-header'
      }
    }
  });
vi desktop/libs/notebook/src/notebook/api.py
@require_POST
@api_error_handler
def create_notebook(request):
  response = {'status': -1}

  editor_type = request.POST.get('type', 'notebook')
  gist_id = request.POST.get('gist')
  directory_uuid = request.POST.get('directory_uuid')

  if gist_id:
    gist_doc = _get_gist_document(uuid=gist_id)
    statement = json.loads(gist_doc.data)['statement']

    editor = make_notebook(
      name='',
      description='',
      editor_type=editor_type,
      statement=statement,
      is_presentation_mode=True
    )
  else:
    editor = Notebook()

  data = editor.get_data()

  if editor_type != 'notebook':
    data['name'] = ''
    data['type'] = 'query-%s' % editor_type  # TODO: Add handling for non-SQL types

  if 'pyspark' in data['type']:
    data['type'] = 'pyspark'

  data['directoryUuid'] = directory_uuid
  editor.data = json.dumps(data)

  response['notebook'] = editor.get_data()
  response['status'] = 0

  return JsonResponse(response)
  
  
@require_POST
@check_document_access_permission
@api_error_handler
def execute(request, engine=None):
  notebook = json.loads(request.POST.get('notebook', '{}'))
  snippet = json.loads(request.POST.get('snippet', '{}'))
  LOG.error('execute  notebook ' + str(notebook))
  LOG.error('execute  snippet ' + str(snippet))

  snippet_type = snippet.get('type', None)
  if snippet_type is not None and 'pyspark' in snippet_type:
    snippet['type'] = 'pyspark'

  for nb in notebook['sessions'] :
    notebook_session_type = nb.get('type', None)
    if notebook_session_type is not None and 'pyspark' in notebook_session_type:
      nb['type'] = 'pyspark'

  with opentracing.tracer.start_span('notebook-execute') as span:
    span.set_tag('user-id', request.user.username)

    response = _execute_notebook(request, notebook, snippet)

    span.set_tag(
      'query-id',
      response['handle']['guid'] if response.get('handle') and response['handle'].get('guid') else None
    )

  return JsonResponse(response)  
vi desktop/core/src/desktop/models.py
def _get_editor(self):
    interpreters = []

    _interpreters = get_ordered_interpreters(self.user)

    for interpreter in _interpreters:
      if interpreter['interface'] != 'hms':
        ttttooltip = _('%s Query') % interpreter['type'].title()
        tttpage = '/editor/?type=%(type)s' % interpreter
        if 'pyspark' in interpreter['type'] :
          ttttooltip = _('Pyspark Query')
          tttpage =  '/editor/?type=%(type)s' % interpreter
          interpreter['type'] = 'pyspark'
          interpreter['dialect'] = 'pyspark'

        interpreters.append({
          'name': interpreter['name'],
          'type': interpreter['type'],
          'displayName': interpreter['name'],
          'buttonName': _('Query'),
          'tooltip': ttttooltip,
          'page': tttpage, #'/editor/?type=%(type)s' % interpreter,
          'is_sql': interpreter['is_sql'],
          'dialect': interpreter['dialect'],
        })
Share