Descrição
Nome do produto: Pressostato para Lavadora Electrolux LTE12 64786941 Emicol
Equipmentos A Mangueira Pressostato para Lavadora de Roupas é responsável por regular o nível da água da lavadora durante o processo de lavagem.
Peça Modelo: 64786941
Tipo:
})
hdfs_files = client.list(‘/datalake/landing/mercado_livre/pressostato’, status=True)
print(hdfs_files[0])
Read
hdfsls = []
for i in multithread(hdfs_files):
_dt = dd.read_parquet('hdfs://' + i.strip())<br /><br />
_pg = dd.read_sql_table('tbl_ml_features', dfengine, index_col='id', npartitions=5)#.rename(columns={"_id":"id"})<br /><br />
_ft = _pg.join(_dt, how="inner")[['product_id', 'id', 'feature', 'source', 'flag_active', 'created']][(_pg.source=="mercado_livre") & (_pg.flag_active==True)]<br /><br />
print("Lines to update:", _ft.shape[0])<br /><br />
_ft.compute_chunk_sizes()<br /><br />
hdfsls.append(_ft)<br /><br />
del _ft, _dt, _pg<br /><br />
gc.collect()<br /><br />
Update
dd.concat([ft for ft in hdfsls]).to_dataframes()[0].set_index(keys=[‘id’]).to_sql(name=’tbl_ml_features’, con=rawengine, schema=’crawler’, if_exists=’replace’, dtype=dphyper)[‘product_id’].to_list()
Close
dfengine.dispose()
rawengine.dispose()
del pt, dfengine, rawengine, client, hdfsls, hdfs_files
print(abs (finish – start), “seconds of execution”)
“””
Training.
“””
print(“Training.”)
Open
dfengine = create_engine(DFURL, pool_size=POOL_SIZE, pool_recycle=180)
rawengine = create_engine(RAWURL, pool_size=POOL_SIZE, pool_recycle=180)
metadata = MetaData(dfengine, schema=’crawler’, reflect=True)
Select
pt = select([metadata.tables[‘categorias’]], metadata.tables[‘categorias’].c.id.in([‘6526’, ‘6527’, ‘6528’, ‘6529’, ‘6530’, ‘6531’, ‘6532’, ‘6533’, ‘6534’, ‘6535’, ‘6536’, ‘6537’, ‘6538’, ‘6539’, ‘6479’, ‘6540’, ‘8488’, ‘6536’, ‘7169’]), order_by=metadata.tables[‘categorias’].c.id)
Union
pt = _pt.union(select([metadata.tables[‘ml_categories’]]).where(metadata.tables[‘mlcategories’].c.id.in([‘9998’])), all=True).union(select([metadata.tables[‘ml_brands’]]), all=True)
Shuffle
pt = pt.order_by(func.random()).cte(‘pt’, recursive=True)
Read
_dct = dd.read_sql_table(‘tbl_ml_dim_clf’, dfengine, index_col=’id’, npartitions=5, columns=[‘id’, ‘id_joint’, ‘group’, ‘name’])#.rename(columns={‘_id’:’id’})
ppp = _dct.groupby(by=[‘id_joint’])[‘name’].apply(list).reset_index(name=”ppp”).set_index([‘id_joint’])
_lf = dd.read_sql_table(‘tbl_ml_history’, rawengine, index_col=’id’, npartitions=5, columns=[‘id’, ‘slug’, ‘id_clf’])#.rename(columns={‘_id’:’id’})
_ft = dd.read_sql_table(‘ft_recent_1′, rawengine, index_col=’id’, npartitions=5, columns=[‘id’])#.rename(columns={‘_id’:’id’})
for _nk in range(99):
ft = dd.merge(pt, _ft.reset_index(level=0), left_on='id', right_on='id', how='inner')<br /><br />
ft = dd.merge(ft.reset_index(level=0), _lf.reset_index(level=0), on='id', how='outer', indicator=True)<br /><br />
ft = dd.merge(ft[ft['_merge']=='left_only'].reset_index(level=0), ppp, left_on='id_joint', right_on='id_joint', how='inner').drop(columns=['ppp_y', '_merge']).rename(columns={'ppp_x':'ppp'}).set_index(['id'])<br /><br />
ft.reset_index(level=0).to_sql('tbl_ml_history', rawengine, schema='crawler', index=False, if_exists='append', chunksize=5, method='multi', dtype=dphyper)<br /><br />
#<br /><br />
del ft<br /><br />
gc.collect()<br /><br />
Close
_dct = None
_lf = None
_ft = None
ppp = None
del dfengine
del rawengine
del _dct
del _lf
del _ft
del ppp
gc.collect()
Close
metadata = None
del metadata
gc.collect()
Close
dfengine.dispose()
rawengine.dispose()
Close
del dfengine
del rawengine
gc.collect()