Welcome, guest! Login / Register - Why register?
Psst.. new poll here.
Psst.. new forums here.
Microsoft is blocking us again (TY IP Reputation!) so dont bother with any of their useless mail servers here and just use oauth login instead. Thank the nice Russians for causing that. :)

Paste

Pasted as Python by registered user vvillacorta ( 2 years ago )
import pip

package_names = ['awswrangler', 'jupyterlab_execute_time']
pip.main(['install'] + package_names + ['--upgrade'])


# Nativos
from dateutil.relativedelta import relativedelta
from time import gmtime, strftime
from datetime import datetime
import random as rn
import joblib
import time
import json
import sys
import os
import gc

#nube
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker import get_execution_role
import awswrangler as wr
import sagemaker
import boto3


#calculo
import pandas as pd
import numpy as np
import scipy

#grafico
from IPython.display import display
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
sns.set(style="whitegrid")

#Interacciones con output
import warnings
warnings.filterwarnings("ignore")
# warnings.simplefilter(action='ignore', category=FutureWarning)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

gc.collect()
# MODELS
##from lightgbm import LGBMClassifier

BASE_DIR = os.path.dirname(os.getcwd())
if BASE_DIR not in sys.path: sys.path.append(BASE_DIR)
    
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
if BASE_DIR not in sys.path: sys.path.append(BASE_DIR)
print("BASE_DIR::: ", BASE_DIR)
#import scorecardpy as sc

SEED = 29082013
os.environ['PYTHONHASHSEED'] = str(SEED)
np.random.seed(SEED)
rn.seed(SEED)


from utils_campania import *
bucket



bucket_s3 = s3.Bucket(bucket)
bucket_s3


esquema_vpc = 'd_mdl_vpc_disc'

def generate_table(esquema_vpc, query_, table, path, llave_, 
                   formato='PARQUET', compresion='SNAPPY', grupo='athenav2'):
    # seteado de variables
    table_ = table + '_SG'
    path_ = path.replace(table, table_)
    path_del = path_.split(bucket)[-1][1:]
    
    # se elimina la ubicacion fisica
    bucket_s3.objects.filter(Prefix=path_del).delete()
    print(table_, path_, path_del)
    
    # se elimina la tabla en athena
    del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
    del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
    
    time.sleep(5)
    # se crea nuevamente la tabla
    result_ = wr.athena.create_ctas_table(
        sql=query_,
        database=esquema_vpc,
        ctas_table=table_,
        wait=True,
        s3_output=path_,
        storage_format=formato,
        write_compression=compresion,
        partitioning_info=[llave_],
        athena_query_wait_polling_delay=0.5,
        boto3_session=boto3.Session(),
        workgroup=grupo
    )
    return del_, result_

def apply_create(table='X', path='X', llave='X', query='X'):
    result = generate_table(esquema_vpc, query, table, path, llave)
    return result[0], result[1]['ctas_query_metadata'].raw_payload['Status']



table_ ='HM_FEEDBACK_RUC_UNICO'
apply_create(
    table=table_, 
    path=path_.format(table_), 
    llave='p_periodo', 
    query="""
        SELECT DD.periodo_campania, DD.num_ruc, --DD.PRODUCTO campania,
         MAX(coalesce(FF.flg_nuevos_pre_feedback, 0)) flg_nuevos_pre,
         MAX(coalesce(FF.flg_nuevos_ap_feedback, 0)) flg_nuevos_ap,
         MAX(coalesce(FF.flg_gestionado_estricto, 0)) flg_gestionado_estricto,
         MAX(coalesce(FF.flg_ce, 0)) flg_ce,
         MAX(coalesce(FF.flg_cne, 0)) flg_cne,
         MAX(coalesce(FF.flg_et, 0)) flg_et,
         MAX(coalesce(FF.flg_no_acepta_campana, 0)) flg_no_acepta_campana,
         MAX(coalesce(FF.flg_tasa_elevada, 0)) flg_tasa_elevada,
         MAX(coalesce(FF.flg_lo_pensara, 0)) flg_lo_pensara,
         MAX(coalesce(FF.flg_acepta_campana, 0)) flg_acepta_campana,
         MAX(coalesce(FF.flg_no_califica, 0)) flg_no_califica,
         SUM(FF.flg_gestionado_estricto) gestion_total,    
         SUM(FF.flg_ce) ce_total,
         SUM(FF.flg_acepta_campana) ac_total,
         DD.periodo_campania p_periodo
    FROM d_mdl_vpc_disc.HM_BASE_DESPLIEGUE_FOR_INDICADORBPE_SG DD
    INNER JOIN d_mdl_vpc_disc.HM_FEEDBACK_TLV_INDICADORBPE_SG FF
    ON DD.periodo_campania = FF.gestion
    --AND DD.PRODUCTO = FF.campania
    AND DD.num_ruc = FF.num_ruc_autocompletado
    GROUP BY DD.periodo_campania, DD.num_ruc--, DD.PRODUCTO
    """
)


df = wr.athena.read_sql_query("""
        SELECT DD.periodo_campania, count(1), count(distinct(num_ruc)), sum(flg_ce), sum(flg_ce)*100.0/count(distinct(num_ruc))
        FROM d_mdl_vpc_disc.HM_FEEDBACK_RUC_UNICO_SG DD
        GROUP BY DD.periodo_campania
        ORDER BY DD.periodo_campania DESC
    """,        
    database="e_perm_aws",
    ctas_approach=False
)
print(df.shape)
df.head(20)

 

Revise this Paste

Your Name: Code Language: