GoogleAPIClient Gmail Overview#
Overview#
本文档记录了我如何使用 Gmail API 来读取邮件内容并自动化发送邮件.
Examples#
test_gmail_client.py
1# -*- coding: utf-8 -*-
2
3import json
4from pathlib import Path
5
6from google.auth.transport.requests import Request
7from google.oauth2.credentials import Credentials
8from google_auth_oauthlib.flow import InstalledAppFlow
9from googleapiclient.discovery import build
10from googleapiclient.errors import HttpError
11
12SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
13
14dir_here = Path(__file__).absolute().parent
15path_temp_json = dir_here / "tmp.json"
16
17dir_home = Path.home()
18dir_google = dir_home / ".google"
19dir_google.mkdir(parents=True, exist_ok=True)
20
21path_client_secrets = (
22 dir_google / "send_and_receive_email_via_gmail_poc_client_secrets.json"
23)
24# If modifying these scopes, delete the file send_and_receive_email_via_gmail_poc_token.json.
25path_token = dir_google / "send_and_receive_email_via_gmail_poc_token.json"
26
27
28def auth():
29 creds = None
30 # The file token.json stores the user's access and refresh tokens, and is
31 # created automatically when the authorization flow completes for the first
32 # time.
33 if path_token:
34 creds = Credentials.from_authorized_user_file(str(path_token), SCOPES)
35 # If there are no (valid) credentials available, let the user log in.
36 if not creds or not creds.valid:
37 if creds and creds.expired and creds.refresh_token:
38 creds.refresh(Request())
39 else:
40 flow = InstalledAppFlow.from_client_secrets_file(
41 str(path_client_secrets),
42 SCOPES,
43 )
44 creds = flow.run_local_server(port=0)
45 # Save the credentials for the next run
46 path_token.write_text(creds.to_json())
47
48 service = build("gmail", "v1", credentials=creds)
49 return service
50
51
52def list_threads(service):
53 res = service.users().messages().list(userId="me").execute()
54 path_temp_json.write_text(json.dumps(res, indent=4))
55
56
57def get_thread_details(service, thread_id: str):
58 res = service.users().threads().get(userId="me", id=thread_id).execute()
59 path_temp_json.write_text(json.dumps(res, indent=4))
60
61
62if __name__ == "__main__":
63 service = auth()
64 list_threads(service)
65 get_thread_details(service, thread_id="18ec9466c9dc95e2")