Psst.. new poll here.
Psst.. new forums here.
Microsoft is blocking us again (TY IP Reputation!) so dont bother with any of their useless mail servers here and just use oauth login instead. Thank the nice Russians for causing that. :)
Paste
Pasted as Python by registered user vvillacorta ( 2 years ago )
import pip
package_names = ['awswrangler', 'jupyterlab_execute_time']
pip.main(['install'] + package_names + ['--upgrade'])
# Nativos
from dateutil.relativedelta import relativedelta
from time import gmtime, strftime
from datetime import datetime
import random as rn
import joblib
import time
import json
import sys
import os
import gc
#nube
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker import get_execution_role
import awswrangler as wr
import sagemaker
import boto3
#calculo
import pandas as pd
import numpy as np
import scipy
#grafico
from IPython.display import display
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
sns.set(style="whitegrid")
#Interacciones con output
import warnings
warnings.filterwarnings("ignore")
# warnings.simplefilter(action='ignore', category=FutureWarning)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
gc.collect()
# MODELS
##from lightgbm import LGBMClassifier
BASE_DIR = os.path.dirname(os.getcwd())
if BASE_DIR not in sys.path: sys.path.append(BASE_DIR)
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
if BASE_DIR not in sys.path: sys.path.append(BASE_DIR)
print("BASE_DIR::: ", BASE_DIR)
#import scorecardpy as sc
SEED = 29082013
os.environ['PYTHONHASHSEED'] = str(SEED)
np.random.seed(SEED)
rn.seed(SEED)
from utils_campania import *
bucket
bucket_s3 = s3.Bucket(bucket)
bucket_s3
esquema_vpc = 'd_mdl_vpc_disc'
def generate_table(esquema_vpc, query_, table, path, llave_,
formato='PARQUET', compresion='SNAPPY', grupo='athenav2'):
# seteado de variables
table_ = table + '_SG'
path_ = path.replace(table, table_)
path_del = path_.split(bucket)[-1][1:]
# se elimina la ubicacion fisica
bucket_s3.objects.filter(Prefix=path_del).delete()
print(table_, path_, path_del)
# se elimina la tabla en athena
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
time.sleep(5)
# se crea nuevamente la tabla
result_ = wr.athena.create_ctas_table(
sql=query_,
database=esquema_vpc,
ctas_table=table_,
wait=True,
s3_output=path_,
storage_format=formato,
write_compression=compresion,
partitioning_info=[llave_],
athena_query_wait_polling_delay=0.5,
boto3_session=boto3.Session(),
workgroup=grupo
)
return del_, result_
def apply_create(table='X', path='X', llave='X', query='X'):
result = generate_table(esquema_vpc, query, table, path, llave)
return result[0], result[1]['ctas_query_metadata'].raw_payload['Status']
table_ ='HM_FEEDBACK_RUC_UNICO'
apply_create(
table=table_,
path=path_.format(table_),
llave='p_periodo',
query="""
SELECT DD.periodo_campania, DD.num_ruc, --DD.PRODUCTO campania,
MAX(coalesce(FF.flg_nuevos_pre_feedback, 0)) flg_nuevos_pre,
MAX(coalesce(FF.flg_nuevos_ap_feedback, 0)) flg_nuevos_ap,
MAX(coalesce(FF.flg_gestionado_estricto, 0)) flg_gestionado_estricto,
MAX(coalesce(FF.flg_ce, 0)) flg_ce,
MAX(coalesce(FF.flg_cne, 0)) flg_cne,
MAX(coalesce(FF.flg_et, 0)) flg_et,
MAX(coalesce(FF.flg_no_acepta_campana, 0)) flg_no_acepta_campana,
MAX(coalesce(FF.flg_tasa_elevada, 0)) flg_tasa_elevada,
MAX(coalesce(FF.flg_lo_pensara, 0)) flg_lo_pensara,
MAX(coalesce(FF.flg_acepta_campana, 0)) flg_acepta_campana,
MAX(coalesce(FF.flg_no_califica, 0)) flg_no_califica,
SUM(FF.flg_gestionado_estricto) gestion_total,
SUM(FF.flg_ce) ce_total,
SUM(FF.flg_acepta_campana) ac_total,
DD.periodo_campania p_periodo
FROM d_mdl_vpc_disc.HM_BASE_DESPLIEGUE_FOR_INDICADORBPE_SG DD
INNER JOIN d_mdl_vpc_disc.HM_FEEDBACK_TLV_INDICADORBPE_SG FF
ON DD.periodo_campania = FF.gestion
--AND DD.PRODUCTO = FF.campania
AND DD.num_ruc = FF.num_ruc_autocompletado
GROUP BY DD.periodo_campania, DD.num_ruc--, DD.PRODUCTO
"""
)
df = wr.athena.read_sql_query("""
SELECT DD.periodo_campania, count(1), count(distinct(num_ruc)), sum(flg_ce), sum(flg_ce)*100.0/count(distinct(num_ruc))
FROM d_mdl_vpc_disc.HM_FEEDBACK_RUC_UNICO_SG DD
GROUP BY DD.periodo_campania
ORDER BY DD.periodo_campania DESC
""",
database="e_perm_aws",
ctas_approach=False
)
print(df.shape)
df.head(20)
Revise this Paste