Psst.. new poll here.
Psst.. new forums here.
Microsoft is blocking us again (TY IP Reputation!) so dont bother with any of their useless mail servers here and just use oauth login instead. Thank the nice Russians for causing that. :)
Paste
Pasted as SQL by registered user vvillacorta ( 3 years ago )
esquema_vpc = 'd_mdl_vpc_disc'
def only_del(table, path):
# seteado de variables
table_ = table + '_SG'
path_ = path.replace(table, table_)
path_del = path_.split(bucket)[-1][1:]
# se elimina la ubicacion fisica
bucket_s3.objects.filter(Prefix=path_del).delete()
print(table_, path_, path_del)
# se elimina la tabla en athena
#del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
def generate_table(esquema_vpc=esquema_vpc, query_='', table='', path='', llave_='periodo',
formato='Parquet', compresion='SNAPPY', grupo='athenav2'):
# seteado de variables
table_ = table + '_SG'
path_ = path.replace(table, table_)
path_del = path_.split(bucket)[-1][1:]
print("tabla: ", table_)
print("uri: ", path_)
print("ruta: ", path_del)
# se elimina la ubicacion fisica
bucket_s3.objects.filter(Prefix=path_del).delete()
print(table_, path_, path_del)
# se elimina la tabla en athena
#del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
time.sleep(5)
# se crea nuevamente la tabla
result_ = wr.athena.create_ctas_table(
sql=query_,
database=esquema_vpc,
ctas_table=table_,
wait=True,
s3_output=path_,
storage_format=formato,
write_compression=compresion,
partitioning_info=[llave_],
athena_query_wait_polling_delay=0.5,
boto3_session=boto3.Session(),
workgroup=grupo
)
return del_, result_
def generate_table_new(query_='', table='', path='', llave_='periodo', esquema_vpc=esquema_vpc,
formato='Parquet', compresion='SNAPPY', grupo='athenav2'):
# seteado de variables
table_ = table + '_SG'
path_ = path.replace(table, table_)
path_del = path_.split(bucket)[-1][1:]
# se elimina la ubicacion fisica
bucket_s3.objects.filter(Prefix=path_del).delete()
print("DEL S3: ", table_, path_, path_del)
# se elimina la tabla en athena
#del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
print("DEL ATHENA")
time.sleep(5)
# se crea nuevamente la tabla
qyery_create = """
CREATE TABLE {}.{}
WITH ( format = '{}',
parquet_compression = '{}',
partitioned_by = ARRAY['{}'],
external_location= '{}'
)
AS (
{}
)
""".format(esquema_vpc, table_, formato, compresion, llave_, path_, query_)
print(qyery_create)
result_ = wr.athena.read_sql_query(
qyery_create,
database=esquema_vpc,
workgroup=grupo,
ctas_approach=False
)
return del_, result_
def apply_create_new(table='X', path='X', llave='X', query='X'):
result = generate_table_new(esquema_vpc, query, table, path, llave)
return result[0], result[1]
def apply_create(table='X', path='X', llave='X', query='X', esquema_vpc=esquema_vpc):
result = generate_table(esquema_vpc, query, table, path, llave)
return result[0], result[1]['ctas_query_metadata'].raw_payload['Status']
Revise this Paste