0

0

如何使用Python读取Hive数据库?

PHPz

PHPz

发布时间:2023-05-09 16:28:07

|

2656人浏览过

|

来源于亿速云

转载

实际业务读取hive数据库的代码

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time
import os
import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict, List
import logging
import threading
import pandas as pd
import pickle
class HiveHelper(object):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        logger:logging.Logger=None
        ):
        self.host = host
        self.port = port
        self.database = database
        self.auth_mechanism = auth_mechanism
        self.user = user
        self.password = password
        self.logger = logger
        self.impala_conn = None
        self.conn = None
        self.cursor = None
        self.engine = None
        self.session = None
    def create_table_code(self, file_name):
        '''创建表类代码'''
        os.system(f'sqlacodegen {self.connection_str} > {file_name}')
        return self.conn
    def get_conn(self):
        '''创建连接或获取连接'''
        if self.conn is None:
            engine = self.get_engine()
            self.conn = engine.connect()
        return self.conn
    def get_impala_conn(self):
        '''创建连接或获取连接'''
        if self.impala_conn is None:
            self.impala_conn = connect(
                host=self.host,
                port=self.port,
                database=self.database,
                auth_mechanism=self.auth_mechanism,
                user=self.user,
                password=self.password
                )
        return self.impala_conn
    def get_engine(self):
        '''创建连接或获取连接'''
        if self.engine is None:
            self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)
        return self.engine
    def get_cursor(self):
        '''创建连接或获取连接'''
        if self.cursor is None:
            self.cursor = self.conn.cursor()
        return self.cursor
    def get_session(self) -> sessionmaker:
        '''创建连接或获取连接'''
        if self.session is None:
            engine = self.get_engine()
            Session = sessionmaker(bind=engine)
            self.session = Session()
        return self.session
    def close_conn(self):
        '''关闭连接'''
        if self.conn is not None:
            self.conn.close()
            self.conn = None
        self.dispose_engine()
        self.close_impala_conn()
    def close_impala_conn(self):
        '''关闭impala连接'''
        if self.impala_conn is not None:
            self.impala_conn.close()
            self.impala_conn = None
    def close_session(self):
        '''关闭连接'''
        if self.session is not None:
            self.session.close()
            self.session = None
        self.dispose_engine()
    def dispose_engine(self):
        '''释放engine'''
        if self.engine is not None:
            # self.engine.dispose(close=False)
            self.engine.dispose()
            self.engine = None
    def close_cursor(self):
        '''关闭cursor'''
        if self.cursor is not None:
            self.cursor.close()
            self.cursor = None
    def get_data(self, sql, auto_close=True) -> pd.DataFrame:
        '''查询数据'''
        conn = self.get_conn()
        data = None
        try:
            # 异常重试3次
            for i in range(3):
                try:
                    data = pd.read_sql(sql, conn)
                    break
                except Exception as ex:
                    if i == 2:
                        raise ex # 往外抛出异常
                    time.sleep(60) # 一分钟后重试
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            if auto_close:
                self.close_conn()
        return data
pass
class VarsHelper():
    def __init__(self, save_dir, auto_save=True):
        self.save_dir = save_dir
        self.auto_save = auto_save
        self.values = {}
        if not os.path.exists(os.path.dirname(self.save_dir)):
            os.makedirs(os.path.dirname(self.save_dir))
        if os.path.exists(self.save_dir):
            with open(self.save_dir, 'rb') as f:
                self.values = pickle.load(f)
                f.close()
    def set_value(self, key, value):
        self.values[key] = value
        if self.auto_save:
            self.save_file()
    def get_value(self, key):
        return self.values[key]
    def has_key(self, key):
        return key in self.values.keys()
    def save_file(self):
        with open(self.save_dir, 'wb') as f:
            pickle.dump(self.values, f)
            f.close()
pass
class GlobalShareArgs():
    args = {
        "debug": False
    }
    def get_args():
        return GlobalShareArgs.args
    def set_args(args):
        GlobalShareArgs.args = args
    def set_args_value(key, value):
        GlobalShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return GlobalShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in GlobalShareArgs.args.keys()
    def update(args):
        GlobalShareArgs.args.update(args)
pass
class ShareArgs():
    args = {
        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录
        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录
        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共
        "only_predict": False, # 只识别,不训练
        "delete_model": True, # 先删除模型,仅在训练时使用
        "export_excel": False, # 导出excel
        "classes": 12, # 聚类数
        "batch_size": 16,
        "hidden_size": 32,
        "max_nrof_epochs": 100,
        "learning_rate": 0.0005,
        "loss_type": "categorical_crossentropy",
        "avg_model_num": 10,
        "steps_per_epoch": 4.0, # 4.0
        "lr_callback_patience": 4, 
        "lr_callback_cooldown": 1,
        "early_stopping_callback_patience": 6,
        "get_data": True,
    }
    def get_args():
        return ShareArgs.args
    def set_args(args):
        ShareArgs.args = args
    def set_args_value(key, value):
        ShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return ShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in ShareArgs.args.keys()
    def update(args):
        ShareArgs.args.update(args)
pass
class UrBiGetDatasBase():
    # 线程锁列表,同保存路径共用锁
    lock_dict:Dict[str, threading.Lock] = {}
    # 时间列表,用于判断是否超时
    time_dict:Dict[str, datetime.datetime] = {}
    # 用于记录是否需要更新超时时间
    get_data_timeout_dict:Dict[str, bool] = {}
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir=None,
        logger:logging.Logger=None,
        ):
        self.save_dir = save_dir
        self.logger = logger
        self.db_helper = HiveHelper(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            logger=logger
            )
        # 创建子目录
        if self.save_dir is not None and not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
        self.vars_helper = None
        if GlobalShareArgs.get_args_value('debug'):
            self.vars_helper = VarsHelper('./hjx/data/vars/UrBiGetDatas') 
    def close(self):
        '''关闭连接'''
        self.db_helper.close_conn()
    def get_last_time(self, key_name) -> bool:
        '''获取是否超时'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if self.vars_helper is not None and self.vars_helper.has_key('UrBiGetDatasBase.time_list'):
            UrBiGetDatasBase.time_dict = self.vars_helper.get_value('UrBiGetDatasBase.time_list')
        timeout = 12 # 12小时
        if GlobalShareArgs.get_args_value('debug'):
            timeout = 24 # 24小时
        get_data_timeout = False
        if key_name not in UrBiGetDatasBase.time_dict.keys() or (datetime.datetime.today() - UrBiGetDatasBase.time_dict[key_name]).total_seconds()>(timeout*60*60):
            self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)
            # UrBiGetDatasBase.time_list[key_name] = datetime.datetime.today()
            get_data_timeout = True
        else:
            self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)
        # if self.vars_helper is not None :
        #     self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_list)
        UrBiGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
        return get_data_timeout
    def save_last_time(self, key_name):
        '''更新状态超时'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if UrBiGetDatasBase.get_data_timeout_dict[key_name]:
            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
        if self.vars_helper is not None :
            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
            self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_dict)
    def get_lock(self, key_name) -> threading.Lock:
        '''获取锁'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if key_name not in UrBiGetDatasBase.lock_dict.keys():
            UrBiGetDatasBase.lock_dict[key_name] = threading.Lock()
        return UrBiGetDatasBase.lock_dict[key_name]
    def get_data_of_date(
        self,
        save_dir,
        sql,
        sort_columns:List[str],
        del_index_list=[-1], # 删除最后下标
        start_date = datetime.datetime(2017, 1, 1), # 开始时间
        offset = relativedelta(months=3), # 时间间隔
        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化
        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化
        stop_date = '20700101', # 超过时间则停止
        data_format_fun = None, # 格式化数据
        ):
        '''分时间增量读取数据'''
        # 创建文件夹
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        else:
            #删除最后一个文件
            file_list = os.listdir(save_dir)
            if len(file_list)>0:
                file_list.sort()
                for del_index in del_index_list:
                    os.remove(os.path.join(save_dir,file_list[del_index]))
                    print('删除最后一个文件:', file_list[del_index])
        select_index = -1
        # start_date = datetime.datetime(2017, 1, 1)
        while True:
            end_date = start_date + offset
            start_date_str = date_format_fun(start_date)
            end_date_str = date_format_fun(end_date)
            self.logger.info('date: %s-%s', start_date_str, end_date_str)
            file_path = os.path.join(save_dir, filename_format_fun(start_date))
            # self.logger.info('file_path: %s', file_path)
            if not os.path.exists(file_path):
                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
                if data is None:
                    break
                self.logger.info('data: %d', len(data))
                # self.logger.info('data: %d', data.columns)
                if len(data)>0:
                    select_index+=1
                    if data_format_fun is not None:
                        data = data_format_fun(data)
                    # 排序
                    data = data.sort_values(sort_columns)
                    data.to_csv(file_path)
                elif select_index!=-1:
                    break
                elif stop_date < start_date_str:
                    raise Exception("读取数据异常,时间超出最大值!")
            start_date = end_date
pass
class UrBiGetDatas(UrBiGetDatasBase):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir='./hjx/data/ur_bi_dw_data',
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_dim_date(self):
        '''日期数据'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_date.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_date'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_date.'+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['dim_date.date_key'])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_shop(self):
        '''店铺数据'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_shop.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_shop'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_shop.'+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['dim_shop.shop_no'])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_vip(self):
        '''会员数据'''
        sub_dir = os.path.join(self.save_dir,'vip_no')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = '''SELECT dv.*, dd.date_key, dd.date_name2 
            FROM ur_bi_dw.dim_vip as dv
            INNER JOIN ur_bi_dw.dim_date as dd
            ON dv.card_create_date=dd.date_name2 
            where dd.date_key >= %s
            and dd.date_key < %s'''
            # data:pd.DataFrame = self.db_helper.get_data(sql)
            sort_columns = ['dv.vip_no']
            # TODO:
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 开始时间
                offset=relativedelta(years=1)
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_weather(self):
        '''天气数据'''
        sub_dir = os.path.join(self.save_dir,'weather')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select weather.* from ur_bi_ods.ods_base_weather_data_1200 as weather
            where weather.date_key>=%s and weather.date_key<%s
            """
            sort_columns = ['weather.date_key','weather.areaid']
            def data_format_fun(data):
                columns = list(data.columns)
                columns = {c:'weather.'+c for c in columns}
                data = data.rename(columns=columns)
                return data
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                del_index_list=[-2, -1], # 删除最后下标
                data_format_fun=data_format_fun,
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_weather_city(self):
        '''天气城市数据'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.weather_city.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_weather_city as weather_city'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'weather_city.'+c for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods(self):
        '''货品数据'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_goods'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_goods.'+c for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods_market_shop_date(self):
        '''店铺商品生命周期数据'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_shop_date.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'
            sql = '''
            select shop_no, sku_no, shop_market_date, lifecycle_end_date, lifecycle_days
            FROM ur_bi_dw.dim_goods_market_shop_date
            where lifecycle_end_date is not null
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('lifecycle_end_date.','') for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['shop_market_date'])
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods_market_date(self):
        '''全国商品生命周期数据'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_date.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = '''
            select * FROM ur_bi_dw.dim_goods_market_date
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_goods_market_date.'+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['dim_goods_market_date.sku_no'])
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods_color_dev_sizes(self):
        '''商品开发码数数据'''
        file_path = os.path.join(self.save_dir,'dim_goods_color_dev_sizes.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'
            sql = 'SELECT * FROM ur_bi_dm.dim_goods_color_dev_sizes'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('dim_goods_color_dev_sizes.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dwd_daily_sales_size(self):
        '''实际销售金额'''
        sub_dir = os.path.join(self.save_dir,'dwd_daily_sales_size_all')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select shop_no,sku_no,date_key,`size`,
                sum(tag_price) as `tag_price`,
                sum(sales_qty) as `sales_qty`,
                sum(sales_tag_amt) as `sales_tag_amt`,
                sum(sales_amt) as `sales_amt`,
                count(0) as `sales_count`
            from ur_bi_dw.dwd_daily_sales_size as sales
            where sales.date_key>=%s and sales.date_key<%s
                and sales.currency_code='CNY'
            group by shop_no,sku_no,date_key,`size`
            """
            sort_columns = ['date_key','shop_no','sku_no']
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 开始时间
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dwd_daily_delivery_size(self):
        '''实际配货金额'''
        sub_dir = os.path.join(self.save_dir,'dwd_daily_delivery_size_all')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select shop_no,sku_no,date_key,`size`,
                sum(delivery.shop_distr_received_qty) as `shop_distr_received_qty`,
                sum(delivery.shop_distr_received_amt) as `shop_distr_received_amt`,
                sum(delivery.online_distr_received_qty) as `online_distr_received_qty`,
                sum(delivery.online_distr_received_amt) as `online_distr_received_amt`,
                sum(delivery.pr_received_qty) as `pr_received_qty`,
                count(0) as `delivery_count`
            from ur_bi_dw.dwd_daily_delivery_size as delivery
            where delivery.date_key>=%s and delivery.date_key<%s
                and delivery.currency_code='CNY'
            group by shop_no,sku_no,date_key,`size`
            """
            sort_columns = ['date_key','shop_no','sku_no']
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 开始时间
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_v_last_nation_sales_status(self):
        '''商品畅滞销数据'''
        file_path = os.path.join(self.save_dir,'v_last_nation_sales_status.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.v_last_nation_sales_status'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('v_last_nation_sales_status.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dwd_daily_finacial_goods(self):
        '''商品成本价数据'''
        file_path = os.path.join(self.save_dir,'dwd_daily_finacial_goods.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = """
            select t1.sku_no,t1.`size`,t1.cost_tax_incl from ur_bi_dw.dwd_daily_finacial_goods as t1
            inner join (
                select sku_no,`size`,max(date_key) as date_key
                from ur_bi_dw.dwd_daily_finacial_goods
                where currency_code='CNY' and country_code='CN'
                group by sku_no,`size`
            ) as t2
            on t2.sku_no=t1.sku_no
                and t2.`size`=t1.`size`
                and t2.date_key=t1.date_key
            where t1.currency_code='CNY' and t1.country_code='CN'
            """
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('t1.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_size_group(self):
        '''尺码映射数据'''
        file_path = os.path.join(self.save_dir,'dim_size_group.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = """select * from ur_bi_dw.dim_size_group"""
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('dim_size_group.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
pass
def get_common_datas(
    host='10.2.32.22',
    port=21051,
    database='ur_ai_dw',
    auth_mechanism='LDAP',
    user='urbi',
    password='Ur#730xd',
    logger:logging.Logger=None):
    # 共用文件
    common_datas_dir = ShareArgs.get_args_value('common_datas_dir')
    common_ur_bi_dir = os.path.join(common_datas_dir, 'ur_bi_data')
    ur_bi_get_datas = UrBiGetDatas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=common_ur_bi_dir,
        logger=logger
    )
    try:
        logger.info('正在查询日期数据...')
        ur_bi_get_datas.get_dim_date()
        logger.info('查询日期数据完成!')
        logger.info('正在查询店铺数据...')
        ur_bi_get_datas.get_dim_shop()
        logger.info('查询店铺数据完成!')
        logger.info('正在查询天气数据...')
        ur_bi_get_datas.get_weather()
        logger.info('查询天气数据完成!')
        logger.info('正在查询天气城市数据...')
        ur_bi_get_datas.get_weather_city()
        logger.info('查询天气城市数据完成!')
        logger.info('正在查询货品数据...')
        ur_bi_get_datas.get_dim_goods()
        logger.info('查询货品数据完成!')
        logger.info('正在查询实际销量数据...')
        ur_bi_get_datas.get_dwd_daily_sales_size()
        logger.info('查询实际销量数据完成!')
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外抛出异常
    finally:
        ur_bi_get_datas.close()
pass
class CustomUrBiGetDatas(UrBiGetDatasBase):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir='./hjx/data/ur_bi_data',
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_sales_goal_amt(self):
        '''销售目标金额'''
        file_path = os.path.join(self.save_dir,'month_of_year_sales_goal_amt.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = '''
            select sales_goal.shop_no,
                if(sales_goal.serial='Y','W',sales_goal.serial) as `sales_goal.serial`,
                dates.month_of_year,
                sum(sales_goal.sales_goal_amt) as sales_goal_amt
            from ur_bi_dw.dwd_sales_goal_west as sales_goal
            inner join ur_bi_dw.dim_date as dates
                on sales_goal.date_key = dates.date_key
            group by sales_goal.shop_no,
                if(sales_goal.serial='Y','W',sales_goal.serial),
                dates.month_of_year
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                'shop_no':'sales_goal.shop_no',
                'serial':'sales_goal.serial',
                'month_of_year':'dates.month_of_year',
            })
            # 排序
            data = data.sort_values(['sales_goal.shop_no','sales_goal.serial','dates.month_of_year'])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_shop_serial_area(self):
        '''店-系列面积'''
        file_path = os.path.join(self.save_dir,'shop_serial_area.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = '''
            select shop_serial_area.shop_no,
                if(shop_serial_area.serial='Y','W',shop_serial_area.serial) as `shop_serial_area.serial`,
                shop_serial_area.month_of_year,
                sum(shop_serial_area.area) as `shop_serial_area.area`
            from ur_bi_dw.dwd_shop_serial_area as shop_serial_area
            where shop_serial_area.area is not null
            group by shop_serial_area.shop_no,if(shop_serial_area.serial='Y','W',shop_serial_area.serial),shop_serial_area.month_of_year
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                'shop_no':'shop_serial_area.shop_no',
                'serial':'shop_serial_area.serial',
                'month_of_year':'shop_serial_area.month_of_year',
                'area':'shop_serial_area.area',
            })
            # 排序
            data = data.sort_values(['shop_serial_area.shop_no','shop_serial_area.serial','shop_serial_area.month_of_year'])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
pass
def get_datas(
    host='10.2.32.22',
    port=21051,
    database='ur_ai_dw',
    auth_mechanism='LDAP',
    user='urbi',
    password='Ur#730xd',
    save_dir='./data/sales_forecast/ur_bi_dw_data',
    logger:logging.Logger=None):
    ur_bi_get_datas = CustomUrBiGetDatas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
    try:
        # 店,系列,品类,年月,销售目标金额
        logger.info('正在查询年月销售目标金额数据...')
        ur_bi_get_datas.get_sales_goal_amt()
        logger.info('查询年月销售目标金额数据完成!')
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外抛出异常
    finally:
        ur_bi_get_datas.close()
pass
def getdata_ur_bi_dw(
    host='10.2.32.22',
    port=21051,
    database='ur_ai_dw',
    auth_mechanism='LDAP',
    user='urbi',
    password='Ur#730xd',
    save_dir='./data/sales_forecast/ur_bi_dw_data',
    logger=None
):
    get_common_datas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        logger=logger
    )
    get_datas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
pass
# 代码入口
# getdata_ur_bi_dw(
#     host=ur_bi_dw_host,
#     port=ur_bi_dw_port,
#     database=ur_bi_dw_database,
#     auth_mechanism=ur_bi_dw_auth_mechanism,
#     user=ur_bi_dw_user,
#     password=ur_bi_dw_password,
#     save_dir=ur_bi_dw_save_dir,
#     logger=logger
#     )

代码说明和领悟

每个类的具体作用说明,代码需要根据下面的文字说明进行“食用”:

(第一层)HiveHelper完成了连接数据库、关闭数据库连接、生成事务、执行、引擎、连接等功能

VarsHelper提供了一个简单的持久化功能,可以将对象以文件的形式存放在磁盘上。并提供设置值、获取值、判断值是否存在的方法

GlobalShareArgs提供了一个字典,并且提供了获取字典、设置字典、设置字典键值对、设置字典键的值、判断键是否在字典中、更新字典等方法

ShareArgs跟GlobalShareArgs类似,只是一开始字典的初始化的键值对比较多

立即学习Python免费学习笔记(深入)”;

(第二层)UrBiGetDataBase类,提供了线程锁字典、时间字典、超时判断字典,都是类变量;使用了HiveHelper类,但注意,不是继承。在具体的sql读数时,提供了线程固定和时间判断

(第三层)UrBiGetDatas类,获取hive数据库那边的日期数据、店铺数据、会员数据、天气数据、天气城市数据、商品数据、店铺生命周期数据、全国商品生命周期数据、商品开发码数数据、实际销售金额、实际配货金额、商品畅滞销数据、商品成本价数据、尺码映射数据等。

AI Web Designer
AI Web Designer

AI网页设计师,快速生成个性化的网站设计

下载

(第四层)get_common_data函数,使用URBiGetData类读取日期、店铺、天气、天气城市、货品、实际销量数据,并缓存到文件夹./yongjian/data/ur_bi_data下面

CustomUrBiGetData类,继承了UrBiGetDatasBase类,读取销售目标金额、点系列面积数据。

(这个也是第四层)get_datas函数,通过CustomUrBiGetData类,读取年月销售目标金额。

总的函数:(这个是总的调用入口函数)get_data_ur_bi_dw函数,调用了get_common_data和get_datas函数进行读取数据,然后将数据保存到某个文件夹目录下面。

举一反三,如果你不是hive数据库,你可以将第一层这个底层更换成mysql。主页有解释如果进行更换。第二层不需要改变,第三层就是你想要进行读取的数据表,不同的数据库你想要读取的数据表也不同,所以sql需要你在这里写,套用里面的方法即可,基本上就是修改sql就好了。

这种方法的好处在于,数据不会重复读取,并且读取的数据都可以得到高效的使用。

后续附上修改成mysql的一个例子代码

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time
import os
import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict, List
import logging
import threading
import pandas as pd
import pickle
class MySqlHelper(object):
    def __init__(
        self,
        host='192.168.15.144',
        port=3306,
        database='test_ims',
        user='spkjz_writer',
        password='7cmoP3QDtueVJQj2q4Az',
        logger:logging.Logger=None
        ):
        self.host = host
        self.port = port
        self.database = database
        self.user = user
        self.password = password
        self.logger = logger
        self.connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(
            self.user, self.password, self.host, self.port, self.database
        )
        self.conn = None
        self.cursor = None
        self.engine = None
        self.session = None
    def create_table_code(self, file_name):
        '''创建表类代码'''
        os.system(f'sqlacodegen {self.connection_str} > {file_name}')
        return self.conn
    def get_conn(self):
        '''创建连接或获取连接'''
        if self.conn is None:
            engine = self.get_engine()
            self.conn = engine.connect()
        return self.conn
    def get_engine(self):
        '''创建连接或获取连接'''
        if self.engine is None:
            self.engine = sqlalchemy.create_engine(self.connection_str)
        return self.engine
    def get_cursor(self):
        '''创建连接或获取连接'''
        if self.cursor is None:
            self.cursor = self.conn.cursor()
        return self.cursor
    def get_session(self) -> sessionmaker:
        '''创建连接或获取连接'''
        if self.session is None:
            engine = self.get_engine()
            Session = sessionmaker(bind=engine)
            self.session = Session()
        return self.session
    def close_conn(self):
        '''关闭连接'''
        if self.conn is not None:
            self.conn.close()
            self.conn = None
        self.dispose_engine()
    def close_session(self):
        '''关闭连接'''
        if self.session is not None:
            self.session.close()
            self.session = None
        self.dispose_engine()
    def dispose_engine(self):
        '''释放engine'''
        if self.engine is not None:
            # self.engine.dispose(close=False)
            self.engine.dispose()
            self.engine = None
    def close_cursor(self):
        '''关闭cursor'''
        if self.cursor is not None:
            self.cursor.close()
            self.cursor = None
    def get_data(self, sql, auto_close=True) -> pd.DataFrame:
        '''查询数据'''
        conn = self.get_conn()
        data = None
        try:
            # 异常重试3次
            for i in range(3):
                try:
                    data = pd.read_sql(sql, conn)
                    break
                except Exception as ex:
                    if i == 2:
                        raise ex # 往外抛出异常
                    time.sleep(60) # 一分钟后重试
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            if auto_close:
                self.close_conn()
        return data
pass
class VarsHelper():
    def __init__(self, save_dir, auto_save=True):
        self.save_dir = save_dir
        self.auto_save = auto_save
        self.values = {}
        if not os.path.exists(os.path.dirname(self.save_dir)):
            os.makedirs(os.path.dirname(self.save_dir))
        if os.path.exists(self.save_dir):
            with open(self.save_dir, 'rb') as f:
                self.values = pickle.load(f)
                f.close()
    def set_value(self, key, value):
        self.values[key] = value
        if self.auto_save:
            self.save_file()
    def get_value(self, key):
        return self.values[key]
    def has_key(self, key):
        return key in self.values.keys()
    def save_file(self):
        with open(self.save_dir, 'wb') as f:
            pickle.dump(self.values, f)
            f.close()
pass
class GlobalShareArgs():
    args = {
        "debug": False
    }
    def get_args():
        return GlobalShareArgs.args
    def set_args(args):
        GlobalShareArgs.args = args
    def set_args_value(key, value):
        GlobalShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return GlobalShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in GlobalShareArgs.args.keys()
    def update(args):
        GlobalShareArgs.args.update(args)
pass
class ShareArgs():
    args = {
        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录
        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录
        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共
        "only_predict": False, # 只识别,不训练
        "delete_model": True, # 先删除模型,仅在训练时使用
        "export_excel": False, # 导出excel
        "classes": 12, # 聚类数
        "batch_size": 16,
        "hidden_size": 32,
        "max_nrof_epochs": 100,
        "learning_rate": 0.0005,
        "loss_type": "categorical_crossentropy",
        "avg_model_num": 10,
        "steps_per_epoch": 4.0, # 4.0
        "lr_callback_patience": 4, 
        "lr_callback_cooldown": 1,
        "early_stopping_callback_patience": 6,
        "get_data": True,
    }
    def get_args():
        return ShareArgs.args
    def set_args(args):
        ShareArgs.args = args
    def set_args_value(key, value):
        ShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return ShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in ShareArgs.args.keys()
    def update(args):
        ShareArgs.args.update(args)
pass
class IMSGetDatasBase():
    # 线程锁列表,同保存路径共用锁
    lock_dict:Dict[str, threading.Lock] = {}
    # 时间列表,用于判断是否超时
    time_dict:Dict[str, datetime.datetime] = {}
    # 用于记录是否需要更新超时时间
    get_data_timeout_dict:Dict[str, bool] = {}
    def __init__(
        self,
        host='192.168.15.144',
        port=3306,
        database='test_ims',
        user='spkjz_writer',
        password='Ur#7cmoP3QDtueVJQj2q4Az',
        save_dir=None,
        logger:logging.Logger=None,
        ):
        self.save_dir = save_dir
        self.logger = logger
        self.db_helper = MySqlHelper(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password,
            logger=logger
            )
        # 创建子目录
        if self.save_dir is not None and not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
        self.vars_helper = None
        if GlobalShareArgs.get_args_value('debug'):
            self.vars_helper = VarsHelper('./hjx/data/vars/IMSGetDatas') # 把超时时间保存到文件,注释该行即可停掉,只用于调试
    def close(self):
        '''关闭连接'''
        self.db_helper.close_conn()
    def get_last_time(self, key_name) -> bool:
        '''获取是否超时'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if self.vars_helper is not None and self.vars_helper.has_key('IMSGetDatasBase.time_list'):
            IMSGetDatasBase.time_dict = self.vars_helper.get_value('IMSGetDatasBase.time_list')
        timeout = 12 # 12小时
        if GlobalShareArgs.get_args_value('debug'):
            timeout = 24 # 24小时
        get_data_timeout = False
        if key_name not in IMSGetDatasBase.time_dict.keys() or (datetime.datetime.today() - IMSGetDatasBase.time_dict[key_name]).total_seconds()>(4*60*60):
            self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)
            # IMSGetDatasBase.time_list[key_name] = datetime.datetime.today()
            get_data_timeout = True
        else:
            self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)
        # if self.vars_helper is not None :
        #     self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_list)
        IMSGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
        return get_data_timeout
    def save_last_time(self, key_name):
        '''更新状态超时'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if IMSGetDatasBase.get_data_timeout_dict[key_name]:
            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
        if self.vars_helper is not None :
            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
            self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_dict)
    def get_lock(self, key_name) -> threading.Lock:
        '''获取锁'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if key_name not in IMSGetDatasBase.lock_dict.keys():
            IMSGetDatasBase.lock_dict[key_name] = threading.Lock()
        return IMSGetDatasBase.lock_dict[key_name]
    def get_data_of_date(
        self,
        save_dir,
        sql,
        sort_columns:List[str],
        del_index_list=[-1], # 删除最后下标
        start_date = datetime.datetime(2017, 1, 1), # 开始时间
        offset = relativedelta(months=3), # 时间间隔
        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化
        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化
        stop_date = '20700101', # 超过时间则停止
        ):
        '''分时间增量读取数据'''
        # 创建文件夹
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        else:
            #删除最后一个文件
            file_list = os.listdir(save_dir)
            if len(file_list)>0:
                file_list.sort()
                for del_index in del_index_list:
                    os.remove(os.path.join(save_dir,file_list[del_index]))
                    print('删除最后一个文件:', file_list[del_index])
        select_index = -1
        # start_date = datetime.datetime(2017, 1, 1)
        while True:
            end_date = start_date + offset
            start_date_str = date_format_fun(start_date)
            end_date_str = date_format_fun(end_date)
            self.logger.info('date: %s-%s', start_date_str, end_date_str)
            file_path = os.path.join(save_dir, filename_format_fun(start_date))
            # self.logger.info('file_path: %s', file_path)
            if not os.path.exists(file_path):
                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
                if data is None:
                    break
                self.logger.info('data: %d', len(data))
                # self.logger.info('data: %d', data.columns)
                if len(data)>0:
                    select_index+=1
                    # 排序
                    data = data.sort_values(sort_columns)
                    data.to_csv(file_path)
                elif select_index!=-1:
                    break
                elif stop_date < start_date_str:
                    raise Exception("读取数据异常,时间超出最大值!")
            start_date = end_date
pass
class CustomIMSGetDatas(IMSGetDatasBase):
    def __init__(
        self,
        host='192.168.13.134',
        port=4000,
        database='test_ims',
        user='root',
        password='rootimmsadmin',
        save_dir='./hjx/data/export_ims_data',
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_ims_w_amt_pro(self):
        '''年月系列占比数据'''
        file_path = os.path.join(self.save_dir,'ims_w_amt_pro.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            # if not self.get_last_time(file_path):
            #     return
            sql = 'SELECT * FROM ims_w_amt_pro'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                'serial_forecast_proportion': 'forecast_proportion',
            })
            data.to_csv(file_path)
            # # 更新超时时间
            # self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
pass
def get_datas(
    host='192.168.13.134',
    port=4000,
    database='test_ims',
    user='root',
    password='rootimmsadmin',
    save_dir='./hjx/data/export_ims_data',
    logger:logging.Logger=None
    ):
    ur_bi_get_datas = CustomIMSGetDatas(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
    try:
        # 年月系列占比数据
        logger.info('正在查询年月系列占比数据...')
        ur_bi_get_datas.get_ims_w_amt_pro()
        logger.info('查询年月系列占比数据完成!')
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外抛出异常
    finally:
        ur_bi_get_datas.close()
pass
def getdata_export_ims(
    host='192.168.13.134',
    port=4000,
    database='test_ims',
    user='root',
    password='rootimmsadmin',
    save_dir='./hjx/data/export_ims_data',
    logger:logging.Logger=None
    ):
    get_datas(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
pass

相关文章

python速学教程(入门到精通)
python速学教程(入门到精通)

python怎么学习?python怎么入门?python在哪学?python怎么学才快?不用担心,这里为大家提供了python速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
数据分析工具有哪些
数据分析工具有哪些

数据分析工具有Excel、SQL、Python、R、Tableau、Power BI、SAS、SPSS和MATLAB等。详细介绍:1、Excel,具有强大的计算和数据处理功能;2、SQL,可以进行数据查询、过滤、排序、聚合等操作;3、Python,拥有丰富的数据分析库;4、R,拥有丰富的统计分析库和图形库;5、Tableau,提供了直观易用的用户界面等等。

1134

2023.10.12

SQL中distinct的用法
SQL中distinct的用法

SQL中distinct的语法是“SELECT DISTINCT column1, column2,...,FROM table_name;”。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

340

2023.10.27

SQL中months_between使用方法
SQL中months_between使用方法

在SQL中,MONTHS_BETWEEN 是一个常见的函数,用于计算两个日期之间的月份差。想了解更多SQL的相关内容,可以阅读本专题下面的文章。

381

2024.02.23

SQL出现5120错误解决方法
SQL出现5120错误解决方法

SQL Server错误5120是由于没有足够的权限来访问或操作指定的数据库或文件引起的。想了解更多sql错误的相关内容,可以阅读本专题下面的文章。

2194

2024.03.06

sql procedure语法错误解决方法
sql procedure语法错误解决方法

sql procedure语法错误解决办法:1、仔细检查错误消息;2、检查语法规则;3、检查括号和引号;4、检查变量和参数;5、检查关键字和函数;6、逐步调试;7、参考文档和示例。想了解更多语法错误的相关内容,可以阅读本专题下面的文章。

380

2024.03.06

oracle数据库运行sql方法
oracle数据库运行sql方法

运行sql步骤包括:打开sql plus工具并连接到数据库。在提示符下输入sql语句。按enter键运行该语句。查看结果,错误消息或退出sql plus。想了解更多oracle数据库的相关内容,可以阅读本专题下面的文章。

1703

2024.04.07

sql中where的含义
sql中where的含义

sql中where子句用于从表中过滤数据,它基于指定条件选择特定的行。想了解更多where的相关内容,可以阅读本专题下面的文章。

586

2024.04.29

sql中删除表的语句是什么
sql中删除表的语句是什么

sql中用于删除表的语句是drop table。语法为drop table table_name;该语句将永久删除指定表的表和数据。想了解更多sql的相关内容,可以阅读本专题下面的文章。

440

2024.04.29

TypeScript类型系统进阶与大型前端项目实践
TypeScript类型系统进阶与大型前端项目实践

本专题围绕 TypeScript 在大型前端项目中的应用展开,深入讲解类型系统设计与工程化开发方法。内容包括泛型与高级类型、类型推断机制、声明文件编写、模块化结构设计以及代码规范管理。通过真实项目案例分析,帮助开发者构建类型安全、结构清晰、易维护的前端工程体系,提高团队协作效率与代码质量。

26

2026.03.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.5万人学习

Django 教程
Django 教程

共28课时 | 5万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号