Psst.. new poll here.
Psst.. new forums here.
Microsoft is blocking us again (TY IP Reputation!) so dont bother with any of their useless mail servers here and just use oauth login instead. Thank the nice Russians for causing that. :)
Paste
Pasted as Python by registered user vvillacorta ( 3 years ago )
def generate_table(esquema_vpc, query_, table, path, llave_,
formato='PARQUET', compresion='SNAPPY', grupo='athenav2'):
# seteado de variables
table_ = table + '_SG'
path_ = path.replace(table, table_)
path_del = path_.split(bucket)[-1][1:]
# se elimina la ubicacion fisica
bucket_s3.objects.filter(Prefix=path_del).delete()
print(table_, path_, path_del)
# se elimina la tabla en athena
#del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
time.sleep(2)
# se crea nuevamente la tabla
result_ = wr.athena.create_ctas_table(
sql=query_,
database=esquema_vpc,
ctas_table=table_,
wait=True,
s3_output=path_,
storage_format=formato,
write_compression=compresion,
partitioning_info=[llave_],
athena_query_wait_polling_delay=0.5,
boto3_session=boto3.Session(),
workgroup=grupo
)
return del_, result_
def apply_create(table='X', path='X', llave='X', query='X'):
result = generate_table(esquema_vpc, query, table, path, llave)
return result[0], result[1]['ctas_query_metadata'].raw_payload['Status']
table_ ='SUNAT_SOLO_BASE_RIESGOS'
apply_create(
table=table_,
path='s3://sagemaker-us-east-1-058528764918/vpc/contactabilidad/athena_2/{}/'.format(table_),
llave='p_periodo',
query="""
SELECT AA.*, AA.periodo_val p_periodo
FROM(
SELECT ROW_NUMBER() OVER(PARTITION by periodo_val, numruc_val ORDER BY tiempo_deuda_tributaria_amt DESC) ORDEN,
*
FROM e_perm_aws.t_fact_vpc_agg_sunat_reniec
WHERE numruc_val IN (select num_ruc from d_mdl_vpc_disc.HM_RIESGOS_DATA_BPE)
AND periodo_val >= '202201'
) AA
WHERE AA.ORDEN = 1
"""
)
Revise this Paste