Estoy tratando de transmitir datos desde Twitter a un depósito de AWS. La buena noticia es que puedo hacer que los datos se transmitan a mi depósito, pero los datos vienen en trozos de aproximadamente 20 kb (creo que esto puede deberse a algunas configuraciones de manguera de incendios) y no se guardan como JSON incluso después de haberlo especificado en mi python código usando JSON.LOAD. En lugar de guardar como JSON, parece que los datos de mi bucket de S3 no tienen una extensión de archivo y tienen una larga cadena de caracteres alfanuméricos. Creo que puede tener algo que ver con los parámetros que se utilizan en client.put_record()

Cualquier ayuda es muy apreciada!

Encuentre mi código a continuación, que obtuve de github aquí.


from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json
import boto3
import time


#Variables that contains the user credentials to access Twitter API
consumer_key = "MY_CONSUMER_KEY"
consumer_secret = "MY_CONSUMER_SECRET"
access_token = "MY_ACCESS_TOKEN"
access_token_secret = "MY_SECRET_ACCESS_TOKEN"


#This is a basic listener that just prints received tweets to stdout.
class StdOutListener(StreamListener):

    def on_data(self, data):
        tweet = json.loads(data)
        try:
            if 'extended_tweet' in tweet.keys():
                #print (tweet['text'])
                message_lst = [str(tweet['id']),
                       str(tweet['user']['name']),
                       str(tweet['user']['screen_name']),
                       tweet['extended_tweet']['full_text'],
                       str(tweet['user']['followers_count']),
                       str(tweet['user']['location']),
                       str(tweet['geo']),
                       str(tweet['created_at']),
                       '\n'
                       ]
                message = '\t'.join(message_lst)
                print(message)
                client.put_record(
                    DeliveryStreamName=delivery_stream,
                    Record={
                    'Data': message
                    }
                )
            elif 'text' in tweet.keys():
                #print (tweet['text'])
                message_lst = [str(tweet['id']),
                       str(tweet['user']['name']),
                       str(tweet['user']['screen_name']),
                       tweet['text'].replace('\n',' ').replace('\r',' '),
                       str(tweet['user']['followers_count']),
                       str(tweet['user']['location']),
                       str(tweet['geo']),
                       str(tweet['created_at']),
                       '\n'
                       ]
                message = '\t'.join(message_lst)
                print(message)
                client.put_record(
                    DeliveryStreamName=delivery_stream,
                    Record={
                    'Data': message
                    }
                )
        except (AttributeError, Exception) as e:
                print (e)
        return True

    def on_error(self, status):
        print (status)
        
        
        
        
        
if __name__ == '__main__':

    #This handles Twitter authetification and the connection to Twitter Streaming API
    listener = StdOutListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)

    #tweets = Table('tweets_ft',connection=conn)
    client = boto3.client('firehose', 
                          region_name='us-east-1',
                          aws_access_key_id='MY ACCESS KEY',
                          aws_secret_access_key='MY SECRET KEY' 
                          )

    delivery_stream = 'my_firehose'
    #This line filter Twitter Streams to capture data by the keywords: 'python', 'javascript', 'ruby'
    #stream.filter(track=['trump'], stall_warnings=True)
    while True:
        try:
            print('Twitter streaming...')
            stream = Stream(auth, listener)
            stream.filter(track=['brexit'], languages=['en'], stall_warnings=True)
        except Exception as e:
            print(e)
            print('Disconnected...')
            time.sleep(5)
            continue   
0
eeno 23 sep. 2020 a las 01:28

2 respuestas

La mejor respuesta

Entonces parece que los archivos venían con formato JSON, solo tuve que abrir los archivos en S3 con Firefox y pude ver el contenido de los archivos. El problema con el tamaño de los archivos se debe a la configuración del búfer de firehose, los tengo configurados al más bajo, por lo que los archivos se enviaban en trozos tan pequeños

0
eeno 26 sep. 2020 a las 17:04

Es posible que haya habilitado la compresión S3 para su firehose. Asegúrese de que la compresión esté deshabilitada si desea almacenar datos json sin procesar en su depósito:

enter image description here

También podría aplicar alguna transformación a su firehose que codifica or otherwise transform sus mensajes json en algún otro formato.

0
Marcin 23 sep. 2020 a las 01:22