如何使用Python讀取Hive數據庫_第1頁
如何使用Python讀取Hive數據庫_第2頁
如何使用Python讀取Hive數據庫_第3頁
如何使用Python讀取Hive數據庫_第4頁
如何使用Python讀取Hive數據庫_第5頁
已閱讀5頁,還剩44頁未讀 繼續免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

第如何使用Python讀取Hive數據庫?port=21051,

database=ur_AI_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

logger:logging.Logger=None

self.host=host

self.port=port

self.database=database

self.auth_mechanism=auth_mechanism

self.user=user

self.password=password

self.logger=logger

self.impala_conn=None

self.conn=None

self.cursor=None

self.engine=None

self.session=None

defcreate_table_code(self,file_name):

創建表類代碼

os.system(fsqlacodegen{self.connection_str}{file_name})

returnself.conn

defget_conn(self):

創建連接或獲取連接

ifself.connisNone:

engine=self.get_engine()

self.conn=engine.connect()

returnself.conn

defget_impala_conn(self):

創建連接或獲取連接

ifself.impala_connisNone:

self.impala_conn=connect(

host=self.host,

port=self.port,

database=self.database,

auth_mechanism=self.auth_mechanism,

user=self.user,

password=self.password

returnself.impala_conn

defget_engine(self):

創建連接或獲取連接

ifself.engineisNone:

self.engine=sqlalchemy.create_engine(impala://,creator=self.get_impala_conn)

returnself.engine

defget_cursor(self):

創建連接或獲取連接

ifself.cursorisNone:

self.cursor=self.conn.cursor()

returnself.cursor

defget_session(self)-sessionmaker:

創建連接或獲取連接

ifself.sessionisNone:

engine=self.get_engine()

Session=sessionmaker(bind=engine)

self.session=Session()

returnself.session

defclose_conn(self):

關閉連接

ifself.connisnotNone:

self.conn.close()

self.conn=None

self.dispose_engine()

self.close_impala_conn()

defclose_impala_conn(self):

關閉impala連接

ifself.impala_connisnotNone:

self.impala_conn.close()

self.impala_conn=None

defclose_session(self):

關閉連接

ifself.sessionisnotNone:

self.session.close()

self.session=None

self.dispose_engine()

defdispose_engine(self):

釋放engine

ifself.engineisnotNone:

#self.engine.dispose(close=False)

self.engine.dispose()

self.engine=None

defclose_cursor(self):

關閉cursor

ifself.cursorisnotNone:

self.cursor.close()

self.cursor=None

defget_data(self,sql,auto_close=True)-pd.DataFrame:

查詢數據

conn=self.get_conn()

data=None

try:

#異常重試3次

foriinrange(3):

try:

data=pd.read_sql(sql,conn)

break

exceptExceptionasex:

ifi==2:

raiseex#往外拋出異常

time.sleep(60)#一分鐘后重試

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

ifauto_close:

self.close_conn()

returndata

classVarsHelper():

def__init__(self,save_dir,auto_save=True):

self.save_dir=save_dir

self.auto_save=auto_save

self.values={}

ifnotos.path.exists(os.path.dirname(self.save_dir)):

os.makedirs(os.path.dirname(self.save_dir))

ifos.path.exists(self.save_dir):

withopen(self.save_dir,rb)asf:

self.values=pickle.load(f)

f.close()

defset_value(self,key,value):

self.values[key]=value

ifself.auto_save:

self.save_file()

defget_value(self,key):

returnself.values[key]

defhas_key(self,key):

returnkeyinself.values.keys()

defsave_file(self):

withopen(self.save_dir,wb)asf:

pickle.dump(self.values,f)

f.close()

classGlobalShareArgs():

args={

debug:False

defget_args():

returnGlobalShareArgs.args

defset_args(args):

GlobalShareArgs.args=args

defset_args_value(key,value):

GlobalShareArgs.args[key]=value

defget_args_value(key,default_value=None):

returnGlobalShareArgs.args.get(key,default_value)

defcontain_key(key):

returnkeyinGlobalShareArgs.args.keys()

defupdate(args):

GlobalShareArgs.args.update(args)

classShareArgs():

args={

labels_dir:./hjx/shop_group/month_w_amt/data/labels,#標簽目錄

labels_output_dir:./hjx/shop_group/month_w_amt/data/labels_output,#聚類導出標簽目錄

common_datas_dir:./hjx/data,#共用數據目錄。ur_bi_dw的公共

only_predict:False,#只識別,不訓練

delete_model:True,#先刪除模型,僅在訓練時使用

export_excel:False,#導出excel

classes:12,#聚類數

batch_size:16,

hidden_size:32,

max_nrof_epochs:100,

learning_rate:0.0005,

loss_type:categorical_crossentropy,

avg_model_num:10,

steps_per_epoch:4.0,#4.0

lr_callback_patience:4,

lr_callback_cooldown:1,

early_stopping_callback_patience:6,

get_data:True,

defget_args():

returnShareArgs.args

defset_args(args):

ShareArgs.args=args

defset_args_value(key,value):

ShareArgs.args[key]=value

defget_args_value(key,default_value=None):

returnShareArgs.args.get(key,default_value)

defcontain_key(key):

returnkeyinShareArgs.args.keys()

defupdate(args):

ShareArgs.args.update(args)

classUrBiGetDatasBase():

#線程鎖列表,同保存路徑共用鎖

lock_dict:Dict[str,threading.Lock]={}

#時間列表,用于判斷是否超時

time_dict:Dict[str,datetime.datetime]={}

#用于記錄是否需要更新超時時間

get_data_timeout_dict:Dict[str,bool]={}

def__init__(

self,

host=2,

port=21051,

database=ur_ai_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

save_dir=None,

logger:logging.Logger=None,

self.save_dir=save_dir

self.logger=logger

self.db_helper=HiveHelper(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

logger=logger

#創建子目錄

ifself.save_dirisnotNoneandnotos.path.exists(self.save_dir):

os.makedirs(self.save_dir)

self.vars_helper=None

ifGlobalShareArgs.get_args_value(debug):

self.vars_helper=VarsHelper(./hjx/data/vars/UrBiGetDatas)

defclose(self):

關閉連接

self.db_helper.close_conn()

defget_last_time(self,key_name)-bool:

獲取是否超時

#轉靜態路徑,確保唯一性

key_name=os.path.abspath(key_name)

ifself.vars_helperisnotNoneandself.vars_helper.has_key(UrBiGetDatasBase.time_list):

UrBiGetDatasBase.time_dict=self.vars_helper.get_value(UrBiGetDatasBase.time_list)

timeout=12#12小時

ifGlobalShareArgs.get_args_value(debug):

timeout=24#24小時

get_data_timeout=False

ifkey_namenotinUrBiGetDatasBase.time_dict.keys()or(datetime.datetime.today()-UrBiGetDatasBase.time_dict[key_name]).total_seconds()(timeout*60*60):

(超時%d小時,重新查數據:%s,timeout,key_name)

#UrBiGetDatasBase.time_list[key_name]=datetime.datetime.today()

get_data_timeout=True

else:

(未超時%d小時,跳過查數據:%s,timeout,key_name)

#ifself.vars_helperisnotNone:

#self.vars_helper.set_value(UrBiGetDatasBase.time_list,UrBiGetDatasBase.time_list)

UrBiGetDatasBase.get_data_timeout_dict[key_name]=get_data_timeout

returnget_data_timeout

defsave_last_time(self,key_name):

更新狀態超時

#轉靜態路徑,確保唯一性

key_name=os.path.abspath(key_name)

ifUrBiGetDatasBase.get_data_timeout_dict[key_name]:

UrBiGetDatasBase.time_dict[key_name]=datetime.datetime.today()

ifself.vars_helperisnotNone:

UrBiGetDatasBase.time_dict[key_name]=datetime.datetime.today()

self.vars_helper.set_value(UrBiGetDatasBase.time_list,UrBiGetDatasBase.time_dict)

defget_lock(self,key_name)-threading.Lock:

獲取鎖

#轉靜態路徑,確保唯一性

key_name=os.path.abspath(key_name)

ifkey_namenotinUrBiGetDatasBase.lock_dict.keys():

UrBiGetDatasBase.lock_dict[key_name]=threading.Lock()

returnUrBiGetDatasBase.lock_dict[key_name]

defget_data_of_date(

self,

save_dir,

sql,

sort_columns:List[str],

del_index_list=[-1],#刪除最后下標

start_date=datetime.datetime(2017,1,1),#開始時間

offset=relativedelta(months=3),#時間間隔

date_format_fun=lambdad:%04d%02d01%(d.year,d.month),#查詢語句中替代時間參數的格式化

filename_format_fun=lambdad:%04d%02d.csv%(d.year,d.month),#查詢語句中替代時間參數的格式化

stop_date=20700101,#超過時間則停止

data_format_fun=None,#格式化數據

分時間增量讀取數據

#創建文件夾

ifnotos.path.exists(save_dir):

os.makedirs(save_dir)

else:

#刪除最后一個文件

file_list=os.listdir(save_dir)

iflen(file_list)0:

file_list.sort()

fordel_indexindel_index_list:

os.remove(os.path.join(save_dir,file_list[del_index]))

print(刪除最后一個文件:,file_list[del_index])

select_index=-1

#start_date=datetime.datetime(2017,1,1)

whileTrue:

end_date=start_date+offset

start_date_str=date_format_fun(start_date)

end_date_str=date_format_fun(end_date)

(date:%s-%s,start_date_str,end_date_str)

file_path=os.path.join(save_dir,filename_format_fun(start_date))

#(file_path:%s,file_path)

ifnotos.path.exists(file_path):

data:pd.DataFrame=self.db_helper.get_data(sql%(start_date_str,end_date_str))

ifdataisNone:

break

(data:%d,len(data))

#(data:%d,data.columns)

iflen(data)0:

select_index+=1

ifdata_format_funisnotNone:

data=data_format_fun(data)

#排序

data=data.sort_values(sort_columns)

data.to_csv(file_path)

elifselect_index!=-1:

break

elifstop_datestart_date_str:

raiseException(讀取數據異常,時間超出最大值!)

start_date=end_date

classUrBiGetDatas(UrBiGetDatasBase):

def__init__(

self,

host=2,

port=21051,

database=ur_ai_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

save_dir=./hjx/data/ur_bi_dw_data,

logger:logging.Logger=None

self.save_dir=save_dir

self.logger=logger

super().__init__(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

save_dir=save_dir,

logger=logger

defget_dim_date(self):

日期數據

file_path=os.path.join(self.save_dir,ur_bi_dw.dim_date.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(file_path):

return

sql=SELECT*FROMur_bi_dw.dim_date

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:dim_date.+cforcincolumns}

data=data.rename(columns=columns)

data=data.sort_values([dim_date.date_key])

data.to_csv(file_path)

#更新超時時間

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_dim_shop(self):

店鋪數據

file_path=os.path.join(self.save_dir,ur_bi_dw.dim_shop.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(file_path):

return

sql=SELECT*FROMur_bi_dw.dim_shop

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:dim_shop.+cforcincolumns}

data=data.rename(columns=columns)

data=data.sort_values([dim_shop.shop_no])

data.to_csv(file_path)

#更新超時時間

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_dim_vip(self):

會員數據

sub_dir=os.path.join(self.save_dir,vip_no)

now_lock=self.get_lock(sub_dir)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(sub_dir):

return

sql=SELECTdv.*,dd.date_key,dd.date_name2

FROMur_bi_dw.dim_vipasdv

INNERJOINur_bi_dw.dim_dateasdd

ONdv.card_create_date=dd.date_name2

wheredd.date_key=%s

anddd.date_key%s

#data:pd.DataFrame=self.db_helper.get_data(sql)

sort_columns=[dv.vip_no]

#TODO:

self.get_data_of_date(

save_dir=sub_dir,

sql=sql,

sort_columns=sort_columns,

start_date=datetime.datetime(2017,1,1),#開始時間

offset=relativedelta(years=1)

#更新超時時間

self.save_last_time(sub_dir)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_weather(self):

天氣數據

sub_dir=os.path.join(self.save_dir,weather)

now_lock=self.get_lock(sub_dir)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(sub_dir):

return

sql=

selectweather.*fromur_bi_ods.ods_base_weather_data_1200asweather

whereweather.date_key=%sandweather.date_key%s

sort_columns=[weather.date_key,weather.areaid]

defdata_format_fun(data):

columns=list(data.columns)

columns={c:weather.+cforcincolumns}

data=data.rename(columns=columns)

returndata

self.get_data_of_date(

save_dir=sub_dir,

sql=sql,

sort_columns=sort_columns,

del_index_list=[-2,-1],#刪除最后下標

data_format_fun=data_format_fun,

#更新超時時間

self.save_last_time(sub_dir)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_weather_city(self):

天氣城市數據

file_path=os.path.join(self.save_dir,ur_bi_dw.weather_city.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(file_path):

return

sql=SELECT*FROMur_bi_dw.dim_weather_cityasweather_city

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:weather_city.+cforcincolumns}

data=data.rename(columns=columns)

data.to_csv(file_path)

#更新超時時間

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_dim_goods(self):

貨品數據

file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(file_path):

return

sql=SELECT*FROMur_bi_dw.dim_goods

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:dim_goods.+cforcincolumns}

data=data.rename(columns=columns)

data.to_csv(file_path)

#更新超時時間

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_dim_goods_market_shop_date(self):

店鋪商品生命周期數據

file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods_market_shop_date.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(file_path):

return

#sql=SELECT*FROMur_bi_dw.dim_goods_market_shop_dateasgoods_shop_date

sql=

selectshop_no,sku_no,shop_market_date,lifecycle_end_date,lifecycle_days

FROMur_bi_dw.dim_goods_market_shop_date

wherelifecycle_end_dateisnotnull

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:c.replace(lifecycle_end_date.,)forcincolumns}

data=data.rename(columns=columns)

data=data.sort_values([shop_market_date])

data.to_csv(file_path,index=False)

#更新超時時間

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_dim_goods_market_date(self):

全國商品生命周期數據

file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods_market_date.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(file_path):

return

sql=

select*FROMur_bi_dw.dim_goods_market_date

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:dim_goods_market_date.+cforcincolumns}

data=data.rename(columns=columns)

data=data.sort_values([dim_goods_market_date.sku_no])

data.to_csv(file_path,index=False)

#更新超時時間

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_dim_goods_color_dev_sizes(self):

商品開發碼數數據

file_path=os.path.join(self.save_dir,dim_goods_color_dev_sizes.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(file_path):

return

#sql=SELECT*FROMur_bi_dw.dim_goods_market_shop_dateasgoods_shop_date

sql=SELECT*FROMur_bi_dm.dim_goods_color_dev_sizes

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:c.replace(dim_goods_color_dev_sizes.,)forcincolumns}

data=data.rename(columns=columns)

data.to_csv(file_path,index=False)

#更新超時時間

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_dwd_daily_sales_size(self):

實際銷售金額

sub_dir=os.path.join(self.save_dir,dwd_daily_sales_size_all)

now_lock=self.get_lock(sub_dir)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(sub_dir):

return

sql=

selectshop_no,sku_no,date_key,`size`,

sum(tag_price)as`tag_price`,

sum(sales_qty)as`sales_qty`,

sum(sales_tag_amt)as`sales_tag_amt`,

sum(sales_amt)as`sales_amt`,

count(0)as`sales_count`

fromur_bi_dw.dwd_daily_sales_sizeassales

wheresales.date_key=%sandsales.date_key%s

andsales.currency_code=CNY

groupbyshop_no,sku_no,date_key,`size`

sort_columns=[date_key,shop_no,sku_no]

self.get_data_of_date(

save_dir=sub_dir,

sql=sql,

sort_columns=sort_columns,

start_date=datetime.datetime(2017,1,1),#開始時間

#更新超時時間

self.save_last_time(sub_dir)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_dwd_daily_delivery_size(self):

實際配貨金額

sub_dir=os.path.join(self.save_dir,dwd_daily_delivery_size_all)

now_lock=self.get_lock(sub_dir)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(sub_dir):

return

sql=

selectshop_no,sku_no,date_key,`size`,

sum(delivery.shop_distr_received_qty)as`shop_distr_received_qty`,

sum(delivery.shop_distr_received_amt)as`shop_distr_received_amt`,

sum(delivery.online_distr_received_qty)as`online_distr_received_qty`,

sum(delivery.online_distr_received_amt)as`online_distr_received_amt`,

sum(delivery.pr_received_qty)as`pr_received_qty`,

count(0)as`delivery_count`

fromur_bi_dw.dwd_daily_delivery_sizeasdelivery

wheredelivery.date_key=%sanddelivery.date_key%s

anddelivery.currency_code=CNY

groupbyshop_no,sku_no,date_key,`size`

sort_columns=[date_key,shop_no,sku_no]

self.get_data_of_date(

save_dir=sub_dir,

sql=sql,

sort_columns=sort_columns,

start_date=datetime.datetime(2017,1,1),#開始時間

#更新超時時間

self.save_last_time(sub_dir)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_v_last_nation_sales_status(self):

商品暢滯銷數據

file_path=os.path.join(self.save_dir,v_last_nation_sales_status.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(file_path):

return

sql=SELECT*FROMur_bi_dw.v_last_nation_sales_status

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:c.replace(v_last_nation_sales_status.,)forcincolumns}

data=data.rename(columns=columns)

data.to_csv(file_path,index=False)

#更新超時時間

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_dwd_daily_finacial_goods(self):

商品成本價數據

file_path=os.path.join(self.save_dir,dwd_daily_finacial_goods.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(file_path):

return

sql=

selectt1.sku_no,t1.`size`,t1.cost_tax_inclfromur_bi_dw.dwd_daily_finacial_goodsast1

innerjoin(

selectsku_no,`size`,max(date_key)asdate_key

fromur_bi_dw.dwd_daily_finacial_goods

wherecurrency_code=CNYandcountry_code=CN

groupbysku_no,`size`

)ast2

ont2.sku_no=t1.sku_no

andt2.`size`=t1.`size`

andt2.date_key=t1.date_key

wheret1.currency_code=CNYandt1.country_code=CN

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:c.replace(t1.,)forcincolumns}

data=data.rename(columns=columns)

data.to_csv(file_path,index=False)

#更新超時時間

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_dim_size_group(self):

尺碼映射數據

file_path=os.path.join(self.save_dir,dim_size_group.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(file_path):

return

sql=select*fromur_bi_dw.dim_size_group

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:c.replace(dim_size_group.,)forcincolumns}

data=data.rename(columns=columns)

data.to_csv(file_path,index=False)

#更新超時時間

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_common_datas(

host=2,

port=21051,

database=ur_ai_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

logger:logging.Logger=None):

#共用文件

common_datas_dir=ShareArgs.get_args_value(common_datas_dir)

common_ur_bi_dir=os.path.join(common_datas_dir,ur_bi_data)

ur_bi_get_datas=UrBiGetDatas(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

save_dir=common_ur_bi_dir,

logger=logger

try:

(正在查詢日期數據...)

ur_bi_get_datas.get_dim_date()

(查詢日期數據完成!)

(正在查詢店鋪數據...)

ur_bi_get_datas.get_dim_shop()

(查詢店鋪數據完成!)

(正在查詢天氣數據...)

ur_bi_get_datas.get_weather()

(查詢天氣數據完成!)

(正在查詢天氣城市數據...)

ur_bi_get_datas.get_weather_city()

(查詢天氣城市數據完成!)

(正在查詢貨品數據...)

ur_bi_get_datas.get_dim_goods()

(查詢貨品數據完成!)

(正在查詢實際銷量數據...)

ur_bi_get_datas.get_dwd_daily_sales_size()

(查詢實際銷量數據完成!)

exceptExceptionasex:

logger.exception(ex)

raiseex#往外拋出異常

finally:

ur_bi_get_datas.close()

classCustomUrBiGetDatas(UrBiGetDatasBase):

def__init__(

self,

host=2,

port=21051,

database=ur_ai_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

save_dir=./hjx/data/ur_bi_data,

logger:logging.Logger=None

self.save_dir=save_dir

self.logger=logger

super().__init__(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

save_dir=save_dir,

logger=logger

defget_sales_goal_amt(self):

銷售目標金額

file_path=os.path.join(self.save_dir,month_of_year_sales_goal_amt.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(file_path):

return

sql=

selectsales_goal.shop_no,

if(sales_goal.serial=Y,W,sales_goal.serial)as`sales_goal.serial`,

dates.month_of_year,

sum(sales_goal.sales_goal_amt)assales_goal_amt

fromur_bi_dw.dwd_sales_goal_westassales_goal

innerjoinur_bi_dw.dim_dateasdates

onsales_goal.date_key=dates.date_key

groupbysales_goal.shop_no,

if(sales_goal.serial=Y,W,sales_goal.serial),

dates.month_of_year

data:pd.DataFrame=self.db_helper.get_data(sql)

data=data.rename(columns={

shop_no:sales_goal.shop_no,

serial:sales_goal.serial,

month_of_year:dates.month_of_year,

#排序

data=data.sort_values([sales_goal.shop_no,sales_goal.serial,dates.month_of_year])

data.to_csv(file_path)

#更新超時時間

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_shop_serial_area(self):

店-系列面積

file_path=os.path.join(self.save_dir,shop_serial_area.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加鎖

try:

#設置超時4小時才重新查數據

ifnotself.get_last_time(file_path):

return

sql=

selectshop_serial_area.shop_no,

if(shop_serial_area.serial=Y,W,shop_serial_area.serial)as`shop_serial_area.serial`,

shop_serial_area.month_of_year,

sum(shop_serial_area.area)as`shop_serial_area.area`

fromur_bi_dw.dwd_shop_serial_areaasshop_serial_area

whereshop_serial_area.areaisnotnull

groupbyshop_serial_area.shop_no,if(shop_serial_area.serial=Y,W,shop_serial_area.serial),shop_serial_area.month_of_year

data:pd.DataFrame=self.db_helper.get_data(sql)

data=data.rename(columns={

shop_no:shop_serial_area.shop_no,

serial:shop_serial_area.serial,

month_of_year:shop_serial_area.month_of_year,

area:shop_serial_area.area,

#排序

data=data.sort_values([shop_serial_area.shop_no,shop_serial_area.serial,shop_serial_area.month_of_year])

data.to_csv(file_path)

#更新超時時間

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外拋出異常

finally:

now_lock.release()#釋放鎖

defget_datas(

host=2,

port=21051,

database=ur_ai_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

save_dir=./data/sales_forecast/ur_bi_dw_data,

logger:logging.Logger=None):

ur_bi_get_datas=CustomUrBiGetDatas(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

save_dir=save_dir,

logger=logger

try:

#店,系列,品類,年月,銷售目標金額

(正在查詢年月銷售目標金額數據...)

ur_bi_get_datas.get_sales_goal_amt()

(查詢年月銷售目標金額數據完成!)

exceptExceptionasex:

logger.exception(ex)

raiseex#往外拋出異常

finally:

ur_bi_get_datas.close()

defgetdata_ur_bi_dw(

host=2,

port=21051,

database=ur_ai_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

save_dir=./data/sales_forecast/ur_bi_dw_data,

logger=None

get_common_datas(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

logger=logger

get_datas(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

save_dir=save_dir,

logger=logger

#代碼入口

#getdata_ur_bi_dw(

#host=ur_bi_dw_host,

#port=ur_bi_dw_port,

#database=ur_bi_dw_database,

#auth_mechanism=ur_bi_dw_auth_mechanism,

#user=ur_bi_dw_user,

#password=ur_bi_dw_password,

#save_dir=ur_bi_dw_save_dir,

#logger=logger

#)

代碼說明和領悟

每個類的具體作用說明,代碼

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論