file one code:- name of the file - google_apis.py

import os
import datetime
from collections import namedtuple
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request

def create_service(client_secret_file, api_name, api_version, *scopes, prefix=''):
CLIENT_SECRET_FILE = client_secret_file
API_SERVICE_NAME = api_name
API_VERSION = api_version
SCOPES = [scope for scope in scopes[0]]
creds = None
working_dir = os.getcwd()
token_dir = '/Users/jitendersingh/Downloads/token'
token_file = f'token_{API_SERVICE_NAME}_{API_VERSION}{prefix}.json'

### Check if token dir exists first, if not, create the folder
if not os.path.exists(os.path.join(working_dir, token_dir)):
os.mkdir(os.path.join(working_dir, token_dir))

if os.path.exists(os.path.join(working_dir, token_dir, token_file)):
creds = Credentials.from_authorized_user_file(os.path.join(working_dir, token_dir, token_file), SCOPES)
# with open(os.path.join(working_dir, token_dir, token_file), 'rb') as token:
# cred = pickle.load(token)

if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES)
creds = flow.run_local_server(port=0)

with open(os.path.join(working_dir, token_dir, token_file), 'w') as token:
token.write(creds.to_json())

try:
service = build(API_SERVICE_NAME, API_VERSION, credentials=creds, static_discovery=False)
print(API_SERVICE_NAME, API_VERSION, 'service created successfully, means token is valid')
return service
except Exception as e:
print(e)
print(f'Failed to create service instance for {API_SERVICE_NAME}, means token is creating issue')
os.remove(os.path.join(working_dir, token_dir, token_file))
return None

def convert_to_RFC_datetime(year=1900, month=1, day=1, hour=0, minute=0):
dt = datetime.datetime(year, month, day, hour, minute, 0).isoformat() + 'Z'
return dt

File 2 Code:- 

import datetime
import time
from googleapiclient.http import MediaFileUpload
import pandas as pd
from google_apis import create_service
from googleapiclient.discovery import build


def video_categories():
video_categories = service.videoCategories().list(part='snippet', regionCode='US').execute()
df = pd.DataFrame(video_categories.get('items'))
return pd.concat([df['id'], df['snippet'].apply(pd.Series)[['title']]], axis=1)

API_NAME = 'youtube'
API_VERSION = 'v3'
SCOPES = ['https://www.googleapis.com/auth/youtube']
# SCOPES = ['https://www.googleapis.com/auth/youtube.upload']
client_file = '/Users/jitendersingh/Downloads/client_secrets.json'
service = create_service(client_file, API_NAME, API_VERSION, SCOPES)

# print(video_categories())

"""
Step 1. Uplaod Video
"""
print("uploading video")
upload_time = (datetime.datetime.now() + datetime.timedelta(days=10)).isoformat() + '.000Z'
request_body = {
'snippet': {
'title': 'Hello Testing',
'description': 'video description',
'categoryId': '28',
'tags': ['Hello, Hi, How are you,']
},
'status': {
'privacyStatus': 'public',
'publishedAt': upload_time,
'selfDeclaredMadeForKids': False
},
'notifySubscribers': False
}

video_file = '/Users/jitendersingh/Desktop/best-things.mp4'
media_file = MediaFileUpload(video_file)
# print(media_file.size() / pow(1024, 2), 'mb')
# print(media_file.to_json())
# print(media_file.mimetype())

response_video_upload = service.videos().insert(
part='snippet,status',
body=request_body,
media_body=media_file
).execute()
uploaded_video_id = response_video_upload.get('id')


"""
Step 2. Update video thumbnail
"""
print("uploading thumbnail")
response_thumbnail_upload = service.thumbnails().set(
videoId=uploaded_video_id,
media_body=MediaFileUpload('/Users/jitendersingh/Desktop/best-drill-machine.jpg')
).execute()




# """
# Step 4. Change the video title
# """
# print("Changing video title")

# # Specify the video ID for which you want to change the title
# video_id = "wrdhEpOLyNQ" # Replace with the actual video ID

# # Retrieve the video resource
# video = service.videos().list(
# part="snippet",
# id=video_id
# ).execute()

# # Update the title
# video['items'][0]['snippet']['title'] = "NEW_VIDEO_TITLE" # Replace with the new title

# # Call the videos().update() method to update the video resource
# response_update_title = service.videos().update(
# part='snippet',
# body=video['items'][0]
# ).execute()

# print("Video title changed successfully")
# print("Everything done")


print("Everything done")


# """
# Step 3 (optional). Set video privacy status to "Public"
# """
# video_id = uploaded_video_id

# counter = 0
# response_update_video = service.videos().list(id=video_id, part='status').execute()
# update_video_body = response_update_video['items'][0]

# while 10 > counter:
# if update_video_body['status']['uploadStatus'] == 'processed':
# update_video_body['status']['privacyStatus'] = 'public'
# service.videos().update(
# part='status',
# body=update_video_body
# ).execute()
# print('Video {0} privacy status is updated to "{1}"'.format(update_video_body['id'], update_video_body['status']['privacyStatus']))
# break
# # adjust the duration based on your video size
# time.sleep(10)
# response_update_video = service.videos().list(id=video_id, part='status').execute()
# update_video_body = response_update_video['items'][0]
# counter += 1

0 comments:

Post a Comment

 
Top