데이터 사이언스/데이터 엔지니어링

T아카데미 - 데이터 엔지니어링 기초 (3)Google Cloud Pub/Sub으로 데이터 보내기

B1001101 2022. 5. 15. 23:57

강의영상

강의 요약

  • 프로젝트 생성: https://cloud.google.com/resource-manager/docs/creating-managing-projects?hl=ko
  • google-cloud-pubsub 라이브러리 설치
  • 인증키 생성
    1. IAM 및 관리자 - 서비스 계정 만들기
    2. 역할: 게시/구독 편집자, BigQuery 데이터 편집자
    3. 키 만들기 -> json 파일 다운로드해서 프로젝트 폴더에 넣음
  • 저장된 데이터 확인하는 방법: subscriber 생성 - 메시지 보기

실습 코드

import tweepy
from google.cloud import pubsub_v1
from google.oauth2 import service_account
from dotenv import load_dotenv
import os
import json

# twitter auth
load_dotenv()
bearer_token = os.getenv("BEARER_TOKEN")

# google auth
project_id = os.getenv("PROJECT_ID")
key_filename = os.getenv("KEY_FILENAME")
key_path = f"{key_filename}.json"
credentials = service_account.Credentials.from_service_account_file(
key_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = pubsub_v1.PublisherClient(credentials=credentials)
topic_path = client.topic_path(project_id, 'tweets')

class SimpleStreamListener(tweepy.StreamingClient):
    def on_tweet(self, tweet):
        print(f"{tweet.id} {tweet.created_at} ({tweet.author_id}): {tweet.text}")
        print("-"*50)
        tweet_data = json.dumps({'id': tweet.id, 'created_at': tweet.created_at, 'text': tweet.text}, default=str)
        client.publish(topic_path, data=tweet_data.encode('utf-8'))
    
    def on_error(self, status):
        print(status)
        if status == 420:
            return False

stream_listener = SimpleStreamListener(bearer_token)

# add new rules    
rule = tweepy.StreamRule(value="data")
stream_listener.add_rules(rule)

stream_listener.filter(expansions="author_id",tweet_fields="created_at")