데이터 사이언스/데이터 엔지니어링

T아카데미 - 데이터 엔지니어링 기초 (2)트위터 스트리밍 데이터 수집

B1001101 2022. 4. 21. 00:32

강의영상

강의 요약

  1. 데이터 엔지니어 취업을 위해 준비야 할 것
    • Apache Spark: 원하는 회사에서 사용한다면 공부해야 함
    • Hadoop은 알면 좋지만 필수는 아님 (요즘에는 직접 다룰 일 별로 없음)
    • CS 지식, 개발역량도 갖춰야 함
  2. 실습 개요: 무인으로 동작하는 실시간 데이터 파이프라인 만들기
    1. Twitter Streaming Data: Google Kubernetes Engine에 올림
    2. Google Cloud Pub/Sub: Kafka와 유사
      • 완전관리형 실시간 메세징 서비스
      • 택배 분류하는 허브 역할
      • Publisher → Pub/Sub Topic → Subscriber (Kafka: Producer → Broker(Kafka Topic) → Consumer
        • Publisher: 여러가지 채널(Topic)에 데이터를 씀
        • Subscriber: Topic을 구독하고 있다가 새로운 데이터가 들어오면 받아서 활용
      • https://cloud.google.com/pubsub/docs/overview?hl=en
    3. Google Cloud Functions: Serverless
    4. BigQuery: 데이터 웨어하우스
    5. DataStudio: 대시보드(시각화)

실습 코드

  • 강의에 나온 V1 방식대로 따라하니까 구버전이라서 그런지 계속 에러가 발생해서 V2 방식을 시도해봤다.
  • 키 값이 노출되지 않게 하기 위해 .env 파일을 만들어 따로 관리하였다. 파이썬에서 .env 파일에 접근하려면 dotenv 라이브러리의 load_dotenv() 함수를 호출한 후 os.getenv() 함수를 사용하여 불러오면 된다.
  • StreamRule을 한 번 추가하면 종료했다가 다시 실행해도 지워지지 않고 그대로 유지되는 것을 발견했다. Rule을 삭제하려면 get_rules() 함수로 id를 얻은 다음 delete_rules 함수로 삭제해야 한다.
  • 참조
import tweepy
from dotenv import load_dotenv
import os
import json

# twitter auth
load_dotenv()
bearer_token = os.getenv("BEARER_TOKEN")

class SimpleStreamListener(tweepy.StreamingClient):
    def on_tweet(self, tweet):
        print(f"{tweet.id} {tweet.created_at} ({tweet.author_id}): {tweet.text}")
        print("-"*50)
    
    def on_error(self, status):
        print(status)
        if status == 420:
            return False

stream_listener = SimpleStreamListener(bearer_token)

# add new rules    
rule = tweepy.StreamRule(value="data")
stream_listener.add_rules(rule)

stream_listener.filter(expansions="author_id",tweet_fields="created_at")