Welcome, guest! Login / Register - Why register?
Psst.. new poll here.
Psst.. new forums here.
Microsoft is blocking us again (TY IP Reputation!) so dont bother with any of their useless mail servers here and just use oauth login instead. Thank the nice Russians for causing that. :)

Paste

Pasted as SQL by registered user vvillacorta ( 2 years ago )
##### UTILS::

esquema_vpc = 'disc_comercial'
grupo_vpc = 'ibk-discovery-comercial-work-group'
path_ = 's3://ibk-discovery-comercial-us-east-1-654654352211-data/discovery/comercial/'


###### NOTEBOOKS


def generate_table(esquema_vpc, query_, table, path, llave_,
                   formato='PARQUET', compresion='SNAPPY', grupo=grupo_vpc):
    # seteado de variables
    table_ = table + '_SG'
    path_ = path.replace(table, table_)
    path_del = path_.split(bucket)[-1][1:]
    
    # se elimina la ubicacion fisica
    bucket_s3.objects.filter(Prefix=path_del).delete()
    print(table_, path_, path_del)

    # se elimina la tabla en athena
    del_ = {}
    try:
        del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
    except Exception as e:
        print('e:::::: ',str(e))
        
    try:
        del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
    except Exception as e:
        print('e:::::: ',str(e))
    
    time.sleep(5)
    # se crea nuevamente la tabla
    result_ = wr.athena.create_ctas_table(
        sql=query_,
        database=esquema_vpc,
        ctas_table=table_,
        wait=True,
        s3_output=path_,
        storage_format=formato,
        write_compression=compresion,
        partitioning_info=[llave_],
        athena_query_wait_polling_delay=0.5,
        boto3_session=boto3.Session(),
        workgroup=grupo
    )
    return del_, result_

def apply_create(table='X', path='X', llave='X', query='X'):
    result = generate_table(esquema_vpc, query, table, path, llave)
    return result[0], result[1]['ctas_query_metadata'].raw_payload['Status']





### USO:
table_fr = "HM_VMVP_FEEDBACK_RUC"
path_fr = '{}vpc/aceptacion/athena_nuevomes/{}/'.format(path_, table_fr)
llave_fr = 'p_periodo'
query_fr = """
       acá va el query
"""
result = generate_table(esquema_vpc, query_fr, table_fr, path_fr, llave_fr)
result[0], result[1]['ctas_query_metadata'].raw_payload['Status']

 

Revise this Paste

Your Name: Code Language: