




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
第如何使用Python讀取Hive數據庫?port=21051,
database=ur_AI_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
logger:logging.Logger=None
self.host=host
self.port=port
self.database=database
self.auth_mechanism=auth_mechanism
self.user=user
self.password=password
self.logger=logger
self.impala_conn=None
self.conn=None
self.cursor=None
self.engine=None
self.session=None
defcreate_table_code(self,file_name):
創建表類代碼
os.system(fsqlacodegen{self.connection_str}{file_name})
returnself.conn
defget_conn(self):
創建連接或獲取連接
ifself.connisNone:
engine=self.get_engine()
self.conn=engine.connect()
returnself.conn
defget_impala_conn(self):
創建連接或獲取連接
ifself.impala_connisNone:
self.impala_conn=connect(
host=self.host,
port=self.port,
database=self.database,
auth_mechanism=self.auth_mechanism,
user=self.user,
password=self.password
returnself.impala_conn
defget_engine(self):
創建連接或獲取連接
ifself.engineisNone:
self.engine=sqlalchemy.create_engine(impala://,creator=self.get_impala_conn)
returnself.engine
defget_cursor(self):
創建連接或獲取連接
ifself.cursorisNone:
self.cursor=self.conn.cursor()
returnself.cursor
defget_session(self)-sessionmaker:
創建連接或獲取連接
ifself.sessionisNone:
engine=self.get_engine()
Session=sessionmaker(bind=engine)
self.session=Session()
returnself.session
defclose_conn(self):
關閉連接
ifself.connisnotNone:
self.conn.close()
self.conn=None
self.dispose_engine()
self.close_impala_conn()
defclose_impala_conn(self):
關閉impala連接
ifself.impala_connisnotNone:
self.impala_conn.close()
self.impala_conn=None
defclose_session(self):
關閉連接
ifself.sessionisnotNone:
self.session.close()
self.session=None
self.dispose_engine()
defdispose_engine(self):
釋放engine
ifself.engineisnotNone:
#self.engine.dispose(close=False)
self.engine.dispose()
self.engine=None
defclose_cursor(self):
關閉cursor
ifself.cursorisnotNone:
self.cursor.close()
self.cursor=None
defget_data(self,sql,auto_close=True)-pd.DataFrame:
查詢數據
conn=self.get_conn()
data=None
try:
#異常重試3次
foriinrange(3):
try:
data=pd.read_sql(sql,conn)
break
exceptExceptionasex:
ifi==2:
raiseex#往外拋出異常
time.sleep(60)#一分鐘后重試
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
ifauto_close:
self.close_conn()
returndata
classVarsHelper():
def__init__(self,save_dir,auto_save=True):
self.save_dir=save_dir
self.auto_save=auto_save
self.values={}
ifnotos.path.exists(os.path.dirname(self.save_dir)):
os.makedirs(os.path.dirname(self.save_dir))
ifos.path.exists(self.save_dir):
withopen(self.save_dir,rb)asf:
self.values=pickle.load(f)
f.close()
defset_value(self,key,value):
self.values[key]=value
ifself.auto_save:
self.save_file()
defget_value(self,key):
returnself.values[key]
defhas_key(self,key):
returnkeyinself.values.keys()
defsave_file(self):
withopen(self.save_dir,wb)asf:
pickle.dump(self.values,f)
f.close()
classGlobalShareArgs():
args={
debug:False
defget_args():
returnGlobalShareArgs.args
defset_args(args):
GlobalShareArgs.args=args
defset_args_value(key,value):
GlobalShareArgs.args[key]=value
defget_args_value(key,default_value=None):
returnGlobalShareArgs.args.get(key,default_value)
defcontain_key(key):
returnkeyinGlobalShareArgs.args.keys()
defupdate(args):
GlobalShareArgs.args.update(args)
classShareArgs():
args={
labels_dir:./hjx/shop_group/month_w_amt/data/labels,#標簽目錄
labels_output_dir:./hjx/shop_group/month_w_amt/data/labels_output,#聚類導出標簽目錄
common_datas_dir:./hjx/data,#共用數據目錄。ur_bi_dw的公共
only_predict:False,#只識別,不訓練
delete_model:True,#先刪除模型,僅在訓練時使用
export_excel:False,#導出excel
classes:12,#聚類數
batch_size:16,
hidden_size:32,
max_nrof_epochs:100,
learning_rate:0.0005,
loss_type:categorical_crossentropy,
avg_model_num:10,
steps_per_epoch:4.0,#4.0
lr_callback_patience:4,
lr_callback_cooldown:1,
early_stopping_callback_patience:6,
get_data:True,
defget_args():
returnShareArgs.args
defset_args(args):
ShareArgs.args=args
defset_args_value(key,value):
ShareArgs.args[key]=value
defget_args_value(key,default_value=None):
returnShareArgs.args.get(key,default_value)
defcontain_key(key):
returnkeyinShareArgs.args.keys()
defupdate(args):
ShareArgs.args.update(args)
classUrBiGetDatasBase():
#線程鎖列表,同保存路徑共用鎖
lock_dict:Dict[str,threading.Lock]={}
#時間列表,用于判斷是否超時
time_dict:Dict[str,datetime.datetime]={}
#用于記錄是否需要更新超時時間
get_data_timeout_dict:Dict[str,bool]={}
def__init__(
self,
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=None,
logger:logging.Logger=None,
self.save_dir=save_dir
self.logger=logger
self.db_helper=HiveHelper(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
logger=logger
#創建子目錄
ifself.save_dirisnotNoneandnotos.path.exists(self.save_dir):
os.makedirs(self.save_dir)
self.vars_helper=None
ifGlobalShareArgs.get_args_value(debug):
self.vars_helper=VarsHelper(./hjx/data/vars/UrBiGetDatas)
defclose(self):
關閉連接
self.db_helper.close_conn()
defget_last_time(self,key_name)-bool:
獲取是否超時
#轉靜態路徑,確保唯一性
key_name=os.path.abspath(key_name)
ifself.vars_helperisnotNoneandself.vars_helper.has_key(UrBiGetDatasBase.time_list):
UrBiGetDatasBase.time_dict=self.vars_helper.get_value(UrBiGetDatasBase.time_list)
timeout=12#12小時
ifGlobalShareArgs.get_args_value(debug):
timeout=24#24小時
get_data_timeout=False
ifkey_namenotinUrBiGetDatasBase.time_dict.keys()or(datetime.datetime.today()-UrBiGetDatasBase.time_dict[key_name]).total_seconds()(timeout*60*60):
(超時%d小時,重新查數據:%s,timeout,key_name)
#UrBiGetDatasBase.time_list[key_name]=datetime.datetime.today()
get_data_timeout=True
else:
(未超時%d小時,跳過查數據:%s,timeout,key_name)
#ifself.vars_helperisnotNone:
#self.vars_helper.set_value(UrBiGetDatasBase.time_list,UrBiGetDatasBase.time_list)
UrBiGetDatasBase.get_data_timeout_dict[key_name]=get_data_timeout
returnget_data_timeout
defsave_last_time(self,key_name):
更新狀態超時
#轉靜態路徑,確保唯一性
key_name=os.path.abspath(key_name)
ifUrBiGetDatasBase.get_data_timeout_dict[key_name]:
UrBiGetDatasBase.time_dict[key_name]=datetime.datetime.today()
ifself.vars_helperisnotNone:
UrBiGetDatasBase.time_dict[key_name]=datetime.datetime.today()
self.vars_helper.set_value(UrBiGetDatasBase.time_list,UrBiGetDatasBase.time_dict)
defget_lock(self,key_name)-threading.Lock:
獲取鎖
#轉靜態路徑,確保唯一性
key_name=os.path.abspath(key_name)
ifkey_namenotinUrBiGetDatasBase.lock_dict.keys():
UrBiGetDatasBase.lock_dict[key_name]=threading.Lock()
returnUrBiGetDatasBase.lock_dict[key_name]
defget_data_of_date(
self,
save_dir,
sql,
sort_columns:List[str],
del_index_list=[-1],#刪除最后下標
start_date=datetime.datetime(2017,1,1),#開始時間
offset=relativedelta(months=3),#時間間隔
date_format_fun=lambdad:%04d%02d01%(d.year,d.month),#查詢語句中替代時間參數的格式化
filename_format_fun=lambdad:%04d%02d.csv%(d.year,d.month),#查詢語句中替代時間參數的格式化
stop_date=20700101,#超過時間則停止
data_format_fun=None,#格式化數據
分時間增量讀取數據
#創建文件夾
ifnotos.path.exists(save_dir):
os.makedirs(save_dir)
else:
#刪除最后一個文件
file_list=os.listdir(save_dir)
iflen(file_list)0:
file_list.sort()
fordel_indexindel_index_list:
os.remove(os.path.join(save_dir,file_list[del_index]))
print(刪除最后一個文件:,file_list[del_index])
select_index=-1
#start_date=datetime.datetime(2017,1,1)
whileTrue:
end_date=start_date+offset
start_date_str=date_format_fun(start_date)
end_date_str=date_format_fun(end_date)
(date:%s-%s,start_date_str,end_date_str)
file_path=os.path.join(save_dir,filename_format_fun(start_date))
#(file_path:%s,file_path)
ifnotos.path.exists(file_path):
data:pd.DataFrame=self.db_helper.get_data(sql%(start_date_str,end_date_str))
ifdataisNone:
break
(data:%d,len(data))
#(data:%d,data.columns)
iflen(data)0:
select_index+=1
ifdata_format_funisnotNone:
data=data_format_fun(data)
#排序
data=data.sort_values(sort_columns)
data.to_csv(file_path)
elifselect_index!=-1:
break
elifstop_datestart_date_str:
raiseException(讀取數據異常,時間超出最大值!)
start_date=end_date
classUrBiGetDatas(UrBiGetDatasBase):
def__init__(
self,
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./hjx/data/ur_bi_dw_data,
logger:logging.Logger=None
self.save_dir=save_dir
self.logger=logger
super().__init__(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
defget_dim_date(self):
日期數據
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_date.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_date
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_date.+cforcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([dim_date.date_key])
data.to_csv(file_path)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_shop(self):
店鋪數據
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_shop.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_shop
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_shop.+cforcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([dim_shop.shop_no])
data.to_csv(file_path)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_vip(self):
會員數據
sub_dir=os.path.join(self.save_dir,vip_no)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(sub_dir):
return
sql=SELECTdv.*,dd.date_key,dd.date_name2
FROMur_bi_dw.dim_vipasdv
INNERJOINur_bi_dw.dim_dateasdd
ONdv.card_create_date=dd.date_name2
wheredd.date_key=%s
anddd.date_key%s
#data:pd.DataFrame=self.db_helper.get_data(sql)
sort_columns=[dv.vip_no]
#TODO:
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017,1,1),#開始時間
offset=relativedelta(years=1)
#更新超時時間
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_weather(self):
天氣數據
sub_dir=os.path.join(self.save_dir,weather)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(sub_dir):
return
sql=
selectweather.*fromur_bi_ods.ods_base_weather_data_1200asweather
whereweather.date_key=%sandweather.date_key%s
sort_columns=[weather.date_key,weather.areaid]
defdata_format_fun(data):
columns=list(data.columns)
columns={c:weather.+cforcincolumns}
data=data.rename(columns=columns)
returndata
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
del_index_list=[-2,-1],#刪除最后下標
data_format_fun=data_format_fun,
#更新超時時間
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_weather_city(self):
天氣城市數據
file_path=os.path.join(self.save_dir,ur_bi_dw.weather_city.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_weather_cityasweather_city
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:weather_city.+cforcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_goods(self):
貨品數據
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_goods
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_goods.+cforcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_goods_market_shop_date(self):
店鋪商品生命周期數據
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods_market_shop_date.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(file_path):
return
#sql=SELECT*FROMur_bi_dw.dim_goods_market_shop_dateasgoods_shop_date
sql=
selectshop_no,sku_no,shop_market_date,lifecycle_end_date,lifecycle_days
FROMur_bi_dw.dim_goods_market_shop_date
wherelifecycle_end_dateisnotnull
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(lifecycle_end_date.,)forcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([shop_market_date])
data.to_csv(file_path,index=False)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_goods_market_date(self):
全國商品生命周期數據
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods_market_date.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(file_path):
return
sql=
select*FROMur_bi_dw.dim_goods_market_date
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_goods_market_date.+cforcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([dim_goods_market_date.sku_no])
data.to_csv(file_path,index=False)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_goods_color_dev_sizes(self):
商品開發碼數數據
file_path=os.path.join(self.save_dir,dim_goods_color_dev_sizes.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(file_path):
return
#sql=SELECT*FROMur_bi_dw.dim_goods_market_shop_dateasgoods_shop_date
sql=SELECT*FROMur_bi_dm.dim_goods_color_dev_sizes
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(dim_goods_color_dev_sizes.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dwd_daily_sales_size(self):
實際銷售金額
sub_dir=os.path.join(self.save_dir,dwd_daily_sales_size_all)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(sub_dir):
return
sql=
selectshop_no,sku_no,date_key,`size`,
sum(tag_price)as`tag_price`,
sum(sales_qty)as`sales_qty`,
sum(sales_tag_amt)as`sales_tag_amt`,
sum(sales_amt)as`sales_amt`,
count(0)as`sales_count`
fromur_bi_dw.dwd_daily_sales_sizeassales
wheresales.date_key=%sandsales.date_key%s
andsales.currency_code=CNY
groupbyshop_no,sku_no,date_key,`size`
sort_columns=[date_key,shop_no,sku_no]
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017,1,1),#開始時間
#更新超時時間
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dwd_daily_delivery_size(self):
實際配貨金額
sub_dir=os.path.join(self.save_dir,dwd_daily_delivery_size_all)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(sub_dir):
return
sql=
selectshop_no,sku_no,date_key,`size`,
sum(delivery.shop_distr_received_qty)as`shop_distr_received_qty`,
sum(delivery.shop_distr_received_amt)as`shop_distr_received_amt`,
sum(delivery.online_distr_received_qty)as`online_distr_received_qty`,
sum(delivery.online_distr_received_amt)as`online_distr_received_amt`,
sum(delivery.pr_received_qty)as`pr_received_qty`,
count(0)as`delivery_count`
fromur_bi_dw.dwd_daily_delivery_sizeasdelivery
wheredelivery.date_key=%sanddelivery.date_key%s
anddelivery.currency_code=CNY
groupbyshop_no,sku_no,date_key,`size`
sort_columns=[date_key,shop_no,sku_no]
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017,1,1),#開始時間
#更新超時時間
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_v_last_nation_sales_status(self):
商品暢滯銷數據
file_path=os.path.join(self.save_dir,v_last_nation_sales_status.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.v_last_nation_sales_status
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(v_last_nation_sales_status.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dwd_daily_finacial_goods(self):
商品成本價數據
file_path=os.path.join(self.save_dir,dwd_daily_finacial_goods.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(file_path):
return
sql=
selectt1.sku_no,t1.`size`,t1.cost_tax_inclfromur_bi_dw.dwd_daily_finacial_goodsast1
innerjoin(
selectsku_no,`size`,max(date_key)asdate_key
fromur_bi_dw.dwd_daily_finacial_goods
wherecurrency_code=CNYandcountry_code=CN
groupbysku_no,`size`
)ast2
ont2.sku_no=t1.sku_no
andt2.`size`=t1.`size`
andt2.date_key=t1.date_key
wheret1.currency_code=CNYandt1.country_code=CN
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(t1.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_size_group(self):
尺碼映射數據
file_path=os.path.join(self.save_dir,dim_size_group.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(file_path):
return
sql=select*fromur_bi_dw.dim_size_group
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(dim_size_group.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_common_datas(
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
logger:logging.Logger=None):
#共用文件
common_datas_dir=ShareArgs.get_args_value(common_datas_dir)
common_ur_bi_dir=os.path.join(common_datas_dir,ur_bi_data)
ur_bi_get_datas=UrBiGetDatas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=common_ur_bi_dir,
logger=logger
try:
(正在查詢日期數據...)
ur_bi_get_datas.get_dim_date()
(查詢日期數據完成!)
(正在查詢店鋪數據...)
ur_bi_get_datas.get_dim_shop()
(查詢店鋪數據完成!)
(正在查詢天氣數據...)
ur_bi_get_datas.get_weather()
(查詢天氣數據完成!)
(正在查詢天氣城市數據...)
ur_bi_get_datas.get_weather_city()
(查詢天氣城市數據完成!)
(正在查詢貨品數據...)
ur_bi_get_datas.get_dim_goods()
(查詢貨品數據完成!)
(正在查詢實際銷量數據...)
ur_bi_get_datas.get_dwd_daily_sales_size()
(查詢實際銷量數據完成!)
exceptExceptionasex:
logger.exception(ex)
raiseex#往外拋出異常
finally:
ur_bi_get_datas.close()
classCustomUrBiGetDatas(UrBiGetDatasBase):
def__init__(
self,
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./hjx/data/ur_bi_data,
logger:logging.Logger=None
self.save_dir=save_dir
self.logger=logger
super().__init__(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
defget_sales_goal_amt(self):
銷售目標金額
file_path=os.path.join(self.save_dir,month_of_year_sales_goal_amt.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(file_path):
return
sql=
selectsales_goal.shop_no,
if(sales_goal.serial=Y,W,sales_goal.serial)as`sales_goal.serial`,
dates.month_of_year,
sum(sales_goal.sales_goal_amt)assales_goal_amt
fromur_bi_dw.dwd_sales_goal_westassales_goal
innerjoinur_bi_dw.dim_dateasdates
onsales_goal.date_key=dates.date_key
groupbysales_goal.shop_no,
if(sales_goal.serial=Y,W,sales_goal.serial),
dates.month_of_year
data:pd.DataFrame=self.db_helper.get_data(sql)
data=data.rename(columns={
shop_no:sales_goal.shop_no,
serial:sales_goal.serial,
month_of_year:dates.month_of_year,
#排序
data=data.sort_values([sales_goal.shop_no,sales_goal.serial,dates.month_of_year])
data.to_csv(file_path)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_shop_serial_area(self):
店-系列面積
file_path=os.path.join(self.save_dir,shop_serial_area.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設置超時4小時才重新查數據
ifnotself.get_last_time(file_path):
return
sql=
selectshop_serial_area.shop_no,
if(shop_serial_area.serial=Y,W,shop_serial_area.serial)as`shop_serial_area.serial`,
shop_serial_area.month_of_year,
sum(shop_serial_area.area)as`shop_serial_area.area`
fromur_bi_dw.dwd_shop_serial_areaasshop_serial_area
whereshop_serial_area.areaisnotnull
groupbyshop_serial_area.shop_no,if(shop_serial_area.serial=Y,W,shop_serial_area.serial),shop_serial_area.month_of_year
data:pd.DataFrame=self.db_helper.get_data(sql)
data=data.rename(columns={
shop_no:shop_serial_area.shop_no,
serial:shop_serial_area.serial,
month_of_year:shop_serial_area.month_of_year,
area:shop_serial_area.area,
#排序
data=data.sort_values([shop_serial_area.shop_no,shop_serial_area.serial,shop_serial_area.month_of_year])
data.to_csv(file_path)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_datas(
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./data/sales_forecast/ur_bi_dw_data,
logger:logging.Logger=None):
ur_bi_get_datas=CustomUrBiGetDatas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
try:
#店,系列,品類,年月,銷售目標金額
(正在查詢年月銷售目標金額數據...)
ur_bi_get_datas.get_sales_goal_amt()
(查詢年月銷售目標金額數據完成!)
exceptExceptionasex:
logger.exception(ex)
raiseex#往外拋出異常
finally:
ur_bi_get_datas.close()
defgetdata_ur_bi_dw(
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./data/sales_forecast/ur_bi_dw_data,
logger=None
get_common_datas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
logger=logger
get_datas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
#代碼入口
#getdata_ur_bi_dw(
#host=ur_bi_dw_host,
#port=ur_bi_dw_port,
#database=ur_bi_dw_database,
#auth_mechanism=ur_bi_dw_auth_mechanism,
#user=ur_bi_dw_user,
#password=ur_bi_dw_password,
#save_dir=ur_bi_dw_save_dir,
#logger=logger
#)
代碼說明和領悟
每個類的具體作用說明,代碼
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 智能物流采購合同管理規范體系
- 節能環保型廠房租賃及設備更新合同
- 城市公交車輛運輸及線路運營合同
- 律師林肯教學課件
- 農業市場國際化合作考核試卷
- 微生物檢驗與保健食品市場準入制度考核試卷
- 護理遴選試題及答案
- 市場分析與預測生產工藝自動化升級考核試卷
- 租賃業務租賃合同條款風險審查要點考核試卷
- 生物圖譜試題及答案
- 2025年內蒙古自治區中考數學真題試卷(含答案)
- CT增強掃描造影劑外滲的預防與處理
- Unit 2 Home Sweet Home 第6課時(Project Reading Plus) 2025-2026學年人教版英語八年級下冊
- xx公司獎金管理制度
- 勞務服務購買協議書范本
- 2025-2030年中國生物醫學材料行業市場深度分析及發展前景與投資研究報告
- 2025年小學語文一年級下冊無紙筆測試題(小學一年級游園樂考無紙化檢測)
- 2025至2030中國彈簧鋼行業產業運行態勢及投資規劃深度研究報告
- 2025年地理中考時政熱點復習課件
- 區塊鏈技術在廣告業的應用行業跨境出海項目商業計劃書
- 2025-2030年中國臨空經濟行業深度評估及市場研究發展研究報告
評論
0/150
提交評論