Welcome, guest! Login / Register - Why register?
Psst.. new poll here.
Psst.. new forums here.
Microsoft is blocking us again (TY IP Reputation!) so dont bother with any of their useless mail servers here and just use oauth login instead. Thank the nice Russians for causing that. :)

Paste

Pasted as SQL by registered user vvillacorta ( 3 years ago )
esquema_vpc = 'd_mdl_vpc_disc'

def only_del(table, path):
    # seteado de variables
    table_ = table + '_SG'
    path_ = path.replace(table, table_)
    path_del = path_.split(bucket)[-1][1:]
    
    # se elimina la ubicacion fisica
    bucket_s3.objects.filter(Prefix=path_del).delete()
    print(table_, path_, path_del)
    
    # se elimina la tabla en athena
    #del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
    del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
    

def generate_table(esquema_vpc=esquema_vpc, query_='', table='', path='', llave_='periodo', 
                   formato='Parquet', compresion='SNAPPY', grupo='athenav2'):
    # seteado de variables
    table_ = table + '_SG'
    path_ = path.replace(table, table_)
    path_del = path_.split(bucket)[-1][1:]
    print("tabla: ", table_)
    print("uri: ", path_)
    print("ruta: ", path_del)
    # se elimina la ubicacion fisica
    bucket_s3.objects.filter(Prefix=path_del).delete()
    print(table_, path_, path_del)
    
    # se elimina la tabla en athena
    #del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
    del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
    
    time.sleep(5)
    # se crea nuevamente la tabla
    result_ = wr.athena.create_ctas_table(
        sql=query_,
        database=esquema_vpc,
        ctas_table=table_,
        wait=True,
        s3_output=path_,
        storage_format=formato,
        write_compression=compresion,
        partitioning_info=[llave_],
        athena_query_wait_polling_delay=0.5,
        boto3_session=boto3.Session(),
        workgroup=grupo
    )
    return del_, result_


def generate_table_new(query_='', table='', path='', llave_='periodo', esquema_vpc=esquema_vpc, 
                   formato='Parquet', compresion='SNAPPY', grupo='athenav2'):
    # seteado de variables
    table_ = table + '_SG'
    path_ = path.replace(table, table_)
    path_del = path_.split(bucket)[-1][1:]
    
    # se elimina la ubicacion fisica
    bucket_s3.objects.filter(Prefix=path_del).delete()
    print("DEL S3: ", table_, path_, path_del)
    
    # se elimina la tabla en athena
    #del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
    del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
    print("DEL ATHENA")
    
    time.sleep(5)
    # se crea nuevamente la tabla
    
    qyery_create = """
            CREATE TABLE {}.{}
            WITH ( format = '{}', 
                     parquet_compression = '{}', 
                     partitioned_by = ARRAY['{}'],
                     external_location= '{}'
                   )
            AS (
                {}
            )
    """.format(esquema_vpc, table_, formato, compresion, llave_, path_, query_)
    print(qyery_create)
    
    result_ = wr.athena.read_sql_query(
        qyery_create,
        database=esquema_vpc,
        workgroup=grupo,         
        ctas_approach=False
    )
    return del_, result_




def apply_create_new(table='X', path='X', llave='X', query='X'):
    result = generate_table_new(esquema_vpc, query, table, path, llave)
    return result[0], result[1]

def apply_create(table='X', path='X', llave='X', query='X', esquema_vpc=esquema_vpc):
    result = generate_table(esquema_vpc, query, table, path, llave)
    return result[0], result[1]['ctas_query_metadata'].raw_payload['Status']

 

Revise this Paste

Your Name: Code Language: