Python使用免费天气API,获取全球任意地区的天气情况

秦俊发
2023-12-01

需求背景:

公司是做外贸服装的,在亚马逊平台上有多个地区店铺运营,运营人员需要参考地区的天气情况,上新的服装.所以需要能够获取全球任意地区的天气情况.还需要预测未来10-15天的天气情况.

选型API:

天气API中有大把免费的api,如:国内的心知天气,国际的雅虎,还有今天的主角:wunderground

最终选择了wunderground,原因:1,需求是全球任意地区的(国内API请求国外地区需要收费才能访问), 2.wunderground提供是信息最全,最丰富的天气api.雅虎提供的天气API信息非常之简略.


直入主题:

官方API文档

这里的免费api只是说测试账号每天有500次的免费请求,要是公司需求大的话,那么就需要付费了.官网价格

准备工作,你需要在官网注册一个账号,然后随意打开一个API的文档, 你会见到

http://api.wunderground.com/api/Your_Key/conditions/q/CA/San_Francisco.json

当中Your_Key的位置有一串key.请保管记住.


python代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 18年3月6日 下午1:53
# @Author  : dongyouyuan
# @email   : 15099977712@163.com
# @File    : weatherApi.py
# @Software: PyCharm
#
# 通过全球天气预报API:http://api.wunderground.com(信息最多最全) 来获取信息

import requests
import json
import datetime
import time
import operator


def timeStamp_to_dataTime(timestamp, format='%Y-%m-%d'):
    """
    时间戳转换成格式化的时间
    :param timestamp: 时间戳
    :param format: 格式
    :return:
    """
    if timestamp is None or not timestamp:
            return ''
    return datetime.datetime.fromtimestamp(int(timestamp)).strftime(format)


def dataTime_to_timeStamp(data_tiem, format='%Y-%m-%d %H:%M'):
    """
    格式化时间格式转换成时间戳
    :param data_tiem: 格式化时间
    :param format: 格式
    :return:
    """
    if not data_tiem:
        return 0
    else:
        try:
            time_array = time.strptime(data_tiem, format)
            time_stamp = int(time.mktime(time_array))
            return time_stamp
        except Exception as error_msg:
            print(error_msg)
            return 0


def get_recent_10_days(date):
    """
    获取前10天的时间格式 20180306
    :return: 返回列表
    """

    date_list = list()
    for i in range(1, 11):
        tmp_date = (datetime.datetime.strptime(date, "%Y%m%d") - datetime.timedelta(days=i)).strftime('%Y%m%d')
        date_list.append(tmp_date)
    return date_list


def sorted_for_list_dict(list_dict, key, reverse=False):
    """
    为列表排序,按照元素中dict的key
    :param list_dict:
    :param key:
    :return:
    """
    return sorted(list_dict, key=operator.itemgetter(key), reverse=reverse)


class Weather(object):
    def __init__(self):
        self.search_url = 'http://autocomplete.wunderground.com/aq'
        self._url = 'http://api.wunderground.com/api/'
        self._key = ''  # Your_ key
        self.url = self._url + self._key + '/'
        self.timeout = 10

    def search(self, query):
        """
        搜索地名,返回结果列表
        :param query:
        :return:
        """
        params = {"query": query}
        headers = {"Content-Type": "application/json"}
        r = requests.get(url=self.search_url, params=params, headers=headers, timeout=self.timeout)
        if r.status_code == 200:
            return r.json().get('RESULTS', None)
        else:
            print('请求失败,失败代码:{}'.format(r.status_code))
            return None

    def get_today(self, zmw):
        """
        根据zmw码获取今天的天气信息
        :param zmw:
        :return:
        """
        # temp 温度
        # humidity 湿度
        # weather 天气情况
        # wind 风
        # dewpoint 露点
        # UV 紫外线强度
        # pressure 压力

        url = self.url + 'conditions' + '/q/zmw:{}.json'.format(zmw)

        headers = {"Content-Type": "application/json"}
        r = requests.get(url=url, headers=headers, timeout=self.timeout)
        if r.status_code == 200:
            # print(r.json())
            return r.json().get('current_observation')
        else:
            print('请求失败,失败代码:{}'.format(r.status_code))
            return None

    def get_hourly_today(self, zmw):
        """
        根据zmw码获取今天每小时的天气信息
        :param zmw:
        :return:
        """
        url = self.url + 'hourly' + '/q/zmw:{}.json'.format(zmw)

        headers = {"Content-Type": "application/json"}
        r = requests.get(url=url, headers=headers, timeout=self.timeout)
        if r.status_code == 200:
            # print(r.json())
            return r.json().get('hourly_forecast')
        else:
            print('请求失败,失败代码:{}'.format(r.status_code))
            return None

    def _get_forecast_10day(self, zmw):
        """
        根据zmw码获取未来10天的天气预报
        :param zmw:
        :return:
        """
        # snow 降雪 (白天,晚上,全天)
        # wind 风 (最大,最小)
        # humidity 湿度(最大,最小,平均)
        # conditions 天气情况 icon icon_url
        # qpf 降水量(白天,晚上,全天)
        # low, high 温度(最高,最低)
        # pop 降水概率

        url = self.url + 'forecast10day' + '/q/zmw:{}.json'.format(zmw)
        headers = {"Content-Type": "application/json"}
        r = requests.get(url=url, headers=headers, timeout=self.timeout)
        if r.status_code == 200:
            return sorted_for_list_dict(r.json().get('forecast').get('simpleforecast').get('forecastday'), 'period')
        else:
            print('请求失败,失败代码:{}'.format(r.status_code))
            return None

    def get_forecast_10day(self, zmw):
        """
        根据zmw码获取未来10天的天气预报(并且整理数据输出)
        :param zmw:
        :return:
        """
        f_day_list = self._get_forecast_10day(zmw)
        if f_day_list:
            result_list = list()
            for f_day in f_day_list:
                forecast_day = dict()
                date_time = "{}{}{}".format(f_day['date']['year'], f_day['date']['month'], f_day['date']['day'])

                time_stamp = dataTime_to_timeStamp(date_time, format='%Y%m%d')
                date_format = timeStamp_to_dataTime(time_stamp, format='%Y%m%d')
                # 当天日期
                forecast_day['date_format'] = date_format

                # 风
                wind = {
                    'ave': f_day.get('avewind', {}),
                    'max': f_day.get('maxwind', {})
                }
                forecast_day['wind'] = wind

                # 雪
                snow = {
                    'day': f_day.get('snow_day', {}),
                    'night': f_day.get('snow_night', {}),
                    'allday': f_day.get('snow_allday', {})
                }
                forecast_day['snow'] = snow

                # 湿度
                humidity = {
                    'min': f_day.get('minhumidity', 0),
                    'max': f_day.get('maxhumidity', 0),
                    'ave': f_day.get('avehumidity', 0)
                }
                forecast_day['humidity'] = humidity

                # 天气情况
                conditions = {
                    'icon_url': f_day.get('icon_url', ''),
                    'icon': f_day.get('icon', ''),
                    'text': f_day.get('conditions', ''),
                    'skyicon': f_day.get('skyicon', '')
                }
                forecast_day['conditions'] = conditions

                # 气压
                qpf = {
                    'day': f_day['qpf_day'],
                    'night': f_day['qpf_night'],
                    'allday': f_day['qpf_allday']
                }
                forecast_day['qpf'] = qpf

                # 气温
                temp = {
                    'low': f_day.get('low', {'celsius': '0', 'fahrenheit': '0'}),
                    'high': f_day.get('high', {'celsius': '0', 'fahrenheit': '0'})
                }
                forecast_day['temp'] = temp
                forecast_day['pop'] = f_day.get('pop', 0)
                forecast_day['date'] = f_day.get('date', {})
                result_list.append(forecast_day)
            return result_list
        else:
            return None

    def get_hourly_10day(self, zmw):
        """
        根据zmw码获取未来10天,每小时的天气预报
        :param zmw:
        :return:
        """
        url = self.url + 'hourly10day' + '/q/zmw:{}.json'.format(zmw)
        headers = {"Content-Type": "application/json"}
        r = requests.get(url=url, headers=headers, timeout=self.timeout)
        if r.status_code == 200:
            # print(r.json())
            return r.json()
        else:
            print('请求失败,失败代码:{}'.format(r.status_code))
            return None

    def _get_history(self, zmw, date):
        """
        根据zmw码获取历史天气
        :param zmw:
        :param dates: 查询时间 20170608
        :return:
        """

        url = self.url + 'history_{}'.format(date) + '/q/zmw:{}.json'.format(zmw)
        headers = {"Content-Type": "application/json"}
        r = requests.get(url=url, headers=headers, timeout=self.timeout)
        if r.status_code == 200:
            # print(r.json())
            return r.json()
        else:
            print('请求失败,失败代码:{}'.format(r.status_code))
            return None

    def get_history(self, zmw, date):
        """
        根据zmw码获取历史天气(并且整理数据输出)
        :param zmw:
        :param dates: 查询时间 20170608
        :return:
        """
        h_day = self._get_history(zmw, date)
        if h_day['history']['dailysummary']:
            h_day = h_day['history']['dailysummary'][0]
            history_day = dict()
            # 当天日期
            history_day['date_format'] = date

            # 风
            wind = {
                'ave': {'dir': h_day.get('meanwdire', ''), 'degrees': h_day.get('meanwdird', ''),
                        'kph': h_day.get('meanwindspdi', ''), 'mph': h_day.get('meanwindspdm', '')},
                'max': {'dir': '', 'degrees': '0', 'kph': '0', 'mph': '0'}
            }
            history_day['wind'] = wind

            # 雪
            snow = {
                'day': {'cm': h_day.get('snowdepthm', ''), 'in': h_day.get('snowdepthi', '')},
                'night': {'cm': '', 'in': ''},
                'allday': {'cm': '', 'in': ''}
            }
            history_day['snow'] = snow

            # 湿度
            humidity = {
                'min': h_day.get('minhumidity', ''),
                'max': h_day.get('maxhumidity', ''),
                'ave': h_day.get('humidity', '')
            }
            history_day['humidity'] = humidity

            # 天气情况
            conditions = {
                'icon_url': '',
                'icon': '',
                'text': '',
                'skyicon': '',

                'tornado': h_day.get('tornado', '0'),
                'fog': h_day.get('fog', '0'),
                'rain': h_day.get('rain', '0'),
                'snow': h_day.get('snow', '0'),
                'thunder': h_day.get('thunder', '0')
            }
            history_day['conditions'] = conditions

            # 气压
            qpf = {
                'day': {'mm': h_day.get('maxpressurem', '0.0'), 'in': h_day.get('maxpressurei', '0.0')},
                'night': {'mm': h_day.get('minpressurem', '0.0'), 'in': h_day.get('minpressurei', '0.0')},
                'allday': {'mm': h_day.get('meanpressurem', '0.0'), 'in': h_day.get('meanpressurei', '0.0')}
            }
            history_day['qpf'] = qpf

            # 温度
            temp = {
                'low': {'celsius': h_day.get('mintempm', '0'), 'fahrenheit': h_day.get('mintempi', '0')},
                'high': {'celsius': h_day.get('maxtempm', '0'), 'fahrenheit': h_day.get('maxtempi', '0')}
            }
            history_day['temp'] = temp

            # 可见度
            visi = {
                'min': {'km': h_day.get('minvism', '0.0'), 'mi': h_day.get('minvisi', '0.0')},
                'mean': {'km': h_day.get('meanvism', '0.0'), 'mi': h_day.get('meanvisi', '0.0')},
                'max': {'km': h_day.get('maxvism', '0.0'), 'mi': h_day.get('maxvisi', '0.0')}
            }
            history_day['visi'] = visi

            # 时间信息
            history_day['date'] = h_day.get('date', {})
            return history_day
        else:
            return None

    def _get_history_10day(self, zmw, date):
        """
        根据zmw码获取10天的历史天气(不包括传入天)
        :param zmw:
        :param dates: 查询时间 20170608
        :return:
        """
        date_list = get_recent_10_days(date)
        result_list = list()
        for history_date in date_list:
            result_list.append(self._get_history(zmw=zmw, date=history_date))
        return result_list

    def get_history_10day(self, zmw, date):
        """
        根据zmw码获取10天的历史天气(不包括传入天)(并且整理输出)
        :param zmw:
        :param dates: 查询时间 20170608
        :return:
        """
        date_list = get_recent_10_days(date)
        result_list = list()
        for history_date in date_list:
            result_list.append(self.get_history(zmw=zmw, date=history_date))
        return result_list


if __name__ == '__main__':
    weather = Weather()
    search_result = weather.search('Paris')
    if search_result:
        zmw = search_result[0].get('zmw')
        if zmw:
            print('*'*100)
            # 获取未来10天的天气预告
            f_10daty = weather.get_forecast_10day(zmw=zmw)
            print(json.dumps(f_10daty, indent=4))
            print(len(f_10daty))

            # 获取某一天的历史天气
            # h_10daty = weather.get_history(zmw=zmw, date='20180306')
            # print(json.dumps(h_10daty, indent=4))
            # print(json.dumps(h_10daty, indent=4))
            # print(len(h_10daty))

            # 获取某天前10天的历史天气(不包括传入天)
            h_10daty = weather.get_history_10day(zmw=zmw, date='20180306')
            print(json.dumps(h_10daty, indent=4))
            print(len(h_10daty))


代码当中封装了Weather类,你需要把刚才注册得到的key放到类的key中去.

代码解析:

类中并没有按照官网API(/q/地区/地名.json)的方法去做,因为发现这样得出的结果往往不准确的.而是通过查询你需要的字符,你自己获得到zmw(这是个唯一的编码)

然后再通过此码去查询.然后在方法 get_forecast_10day(),get_history(), get_history_10day()封装了一下,原因是官网的不同api获取的天气信息所叫的名称不一样(变量名)

这样用起来有点麻烦,所以统一整理,封装了一下.


 类似资料: